Kafka 结构

Kafka 是一个高吞吐量、分布式的发布一订阅消息系统。当前的 Kafka己经定位为一个分布式流式处理平台。

Kafka 是一款开源的、轻量级的 、分布式、可分区和具有复制备份的 (Replicated)、基于 ZooKeeper 协调 管理的分布式流平台的功能 强大的消息系统 。与传统 的消息系统相比, Kafka 能够很好地处理活跃的流数据,使得数据在各个子系统中高性能、低延迟地不停流转。

  • 能够允许发布和订阅流数据
  • 存储流数据时提供相应的容错机制
  • 当流数据到达时能够被及时处理

生产者负责生产消息,将消息写入 Kafka 集群:消费者从 Kafka 集群中拉取消息

message

消息是 Kafka通信的基本单位

topic

topic 对 Message 的一个分类。生产者将消息发送到特定主题,消费者订阅主题或主题的某些分区进行消费。

partition and duplicate

一组消息归纳为一个主题 每个主题又被分成一个或多个分区 每个分区由一系列有序、不可变的消息组成,一个有序队列。 每个分区又有一至多个 副本( Replica),分区的副本分布在集群的不同代理上,以提高可用性

物理上: 每个分区在物理上对应为一个文件夹 分区的命名规则为主题名称后接“一”连接符,之 后再接分区编号,分区编号从 0 开始 : topic-0

image-20210319111503113

分区

分区优点: 分区使得 Kafka在井发处理上变得更加容易,理论上来说,分区数越多吞吐量越高,但这 要根据集群实际环境及业务场景而定。同时,分区也是 Kafka保证消息被顺序消费以及对消息 进行负载均衡的基础。

Kafka 只能保证一个分区之内消息的有序性,并不能保证跨分区消息的有序性 。 每条消息被追加到相应的分区中,是顺序写磁盘,因此效率非常高,这是 Kafka 高吞吐率的 一个重要保证。

Kafka 并不会立即删除已被消费的消息,由于磁盘的限制 消息也不会一直被存储(事实上这也是没有必要的),因此 Kafka提供两种删除老数据的策略, 一是基于消息己存储的时间长度, 二是基于分区的大小

一个分区的多个副本之间数据的一致性, Kafka会选 择该分区的 一个副本作为 Leader 副本,而该分区其他副本即为 Follower 副本,只有 Leader 副 本才负责处理客户端读/写请求, Follower 副本从 Leader 副本同步数据。

log

分区的副本与日志对象是一一对应的。任何发布到分区的消息会被直接追加到日志文件的尾部

image-20210319115217440

代理 Broker

生产环境中 Kafka集群一般包括一台或多台服务器 ,我们可以在一台服务器 上配置一个或多个代理。 每一个代理都有唯一的标识 id,这个 id是一个非负整数。在一个 Kafka 集群中, 每增加一个代理就需要为这个代理配置一个与该集群中其他代理不同的 id, id值可以 选择任意非负整数 即可,只 要保证它在整个 Kafka 集群中唯 一 , 这个 id 就是代理的名字,也就 是在启动 代理时配置的 broker.id 对应的值,因此在本书中有时我们也称为 brokerId。

每个代理可能每个主题有零个或多个分区。

  • 某主题分区数 = Broker数 每个代理将有一个分区
  • 某主题分区数 < Broker数 每个代理将有一个分区, 剩余broker没有该主题分区
  • 某主题分区数 > Broker数 每个代理将有一个分区或多个分区, Broker之间的负载分配不均衡,不推荐这种情况

image-20210319112910850

image-20210319115921181

image-20210319120011944

image-20210319120228938

生产者 Producers

生产者(Producer)负责将消息发送给代理,也就是向 Kafka代理发送消息的客户端。

消费者 Consumers

我们可以为每个消费者指定一个消费组,以 groupId代表消费组名称 如果不指定消费组,则该消费者属于默认消费组 test-consumer-group Kafka 会自动为该消费者生成一个全局唯一的 id,格式为${groupld}-${hostName}-${timestamp}-${UUID 前 8位字符}

image-20210320154146826

基本概念

  • 多个消费者分配到同一个消费者
  • 多个消费组订阅同一个主题
  • 同一主题的一条消息只能被同一个消费组里某一个消费者消费 (同组里消费者消息共享)
  • 不同消费组的消费者可同时消费该消息
  • 一旦新消费者到达,Kafka将其操作切换到共享模式并在两个消费者之间共享数据。 util Consumers数量 == 该特定主题配置的分区数量 (Kafka的每个消费者都将被分配至少一个分区) 没有空闲分区,新消费者将不会收到任何进一步的消息,直到现有的任何消费者退订。

ISR

在 ZooKeeper 中动态维护了一个ISR,保存同步的副本列表,该列表中保存的是与 Leader 副本保持消息同步的所有副本对应的代理节点 id

ZooKeeper

Kafka 在启动或运行过程当中会在 ZooKeeper 上创建相应节点来保 存元数据信息, Kafka 通过监昕机制在这些节点注册相应监听器来监昕节点元数据的变化,从 而由 ZooKeeper 负责管理维护 Kafka集群,同时通过 ZooKeeper 我们能够很方便地对 Kafka 集 群进行水平扩展及数据迁移

Kafka 基本结构

image-20210319113722211

Why 文件系统来存储数据

  • 文件系统存储速度快慢一定程度上也取决于我们对磁盘的用法。线性写的速度约是随机写 的 6000 多倍
  • Java 对 象内存消耗非常高, 且随着 Java 对象的增加JVM垃圾回收也越来越频繁和繁琐,这些都加大了内存的消耗 。

数据的持久化队列可以建立在简单地对文件进行追加的实现方案上,因为是顺序追加, 所以 Kafka在设计上是采用时间复杂度O(1)的磁盘结构,它提供了常量时间的性能,即使是存 储海量的信息( TB 级)也如此,性能和数据的大小关系也不大,同时 Kafka将数据持久化到磁 盘上,这样只要磁盘空间足够大数据就可以一直追加,而不会像一般的消息系统在消息被消费 后就删除掉, Kafka 提供了相关配置让用户自己决定消息要保存多久,这样为消费者提供了更 灵活的处理方式,因此 Kafka 能够在没有性能损失的情况下提供一般消息系统不具备的特性 。

工作流

  • Producers定期向Topic发送消息
  • Broker确保消息在分区之间平均分享,如果制作者发送两条消息并且有两个分区,则Kafka将在第一个分区中存储一条消息,并在第二个分区中存储第二条消息。
  • Consumers订阅特定Topic
  • 当消费者订阅了一个主题,Kafka将向消费者提供该主题的当前偏移量,并且还将该偏移量保存在Zookeeper集合中。
  • Consumers 定期pull message
  • Kafka收到生产者的消息后,会将这些消息转发给消费者
  • 消费者将收到消息并进行处理
  • 当消息被处理,消费者将向Broker发送确认
  • Kafka收到确认后,会将偏移量更改为新值并在Zookeeper中更新它。 由于在Zookeeper中维护了偏移量,因此即使在服务器繁忙期间,使用者也可以正确读取下一条消息。
  • 循环以上
  • 消费者可以随时选择倒带/跳至期望的主题偏移量并阅读所有后续消息

数据可靠性

Kafka 只对“已提交”的消息(committed message)做有限度的持久化保证

当 Kafka 的若干个 Broker 成功地接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交

Kafka 在 Producer 里面提供了消息确认机制

  • acks=0:生产者不会等待任何来自服务器的响应。

    如果当中出现问题,导致服务器没有收到消息,那么生产者无从得知,会造成消息丢失

    由于生产者不需要等待服务器的响应所以可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量

  • acks=1(默认值):只要集群的Leader节点收到消息,生产者就会收到一个来自服务器的成功响应

    如果消息无法到达Leader节点(例如Leader节点崩溃,新的Leader节点还没有被选举出来)生产者就会收到一个错误响应,为了避免数据丢失,生产者会重发消息

    如果一个没有收到消息的节点成为新Leader,消息还是会丢失

    此时的吞吐量主要取决于使用的是同步发送还是异步发送,吞吐量还受到发送中消息数量的限制,例如生产者在收到服务器响应之前可以发送多少个消息

  • acks=-1:只有当所有参与复制的节点全部都收到消息时,生产者才会收到一个来自服务器的成功响应

    这种模式是最安全的,可以保证不止一个服务器收到消息,就算有服务器发生崩溃,整个集群依然可以运行

    延时比acks=1更高,因为要等待不止一个服务器节点接收消息

Topic 分区副本

众多的分区副本里面有一个副本是 Leader,其余的副本是 follower,所有的读写操作都是经过 Leader 进行的,同时 follower 会定期地去 leader 上复制数据。当 Leader 挂掉之后,其中一个 follower 会重新成为新的 Leader。通过分区副本,引入了数据冗余,同时也提供了 Kafka 的数据可靠性。

比如创建主题的时候指定 replication-factor,也可以在 Broker 级别进行配置 default.replication.factor),一般会设置为3

ISR

每个分区的 leader 会维护一个 ISR 列表,ISR 列表里面就是 follower 副本的 Borker 编号,只有“跟得上” Leader 的 follower 副本才能加入到 ISR 里面,这个是通过 replica.lag.time.max.ms 参数配置的。只有 ISR 里的成员才有被选为 leader 的可能。

Kafka的broker端有一个参数replica.lag.time.max.ms, 该参数表示follower副本滞后与Leader副本的最长时间间隔,默认是10秒。这就意味着,只要follower副本落后于leader副本的时间间隔不超过10秒,就可以认为该follower副本与leader副本是同步的,所以哪怕当前follower副本落后于Leader副本几条消息,只要在10秒之内赶上Leader副本,就不会被踢出出局。

当 Leader 挂掉了,而且 unclean.leader.election.enable=false 的情况下,Kafka 会从 ISR 列表中选择第一个 follower 作为新的 Leader,因为这个分区拥有最新的已经 committed 的消息。通过这个可以保证已经 committed 的消息的数据可靠性。(在 Kafka 中,选举这种副本的过程可以通过Broker 端参数 **unclean.leader.election.enable **控制是否允许 Unclean 领导者选举。开启 Unclean 领导者选举可能会造成数据丢失,但好处是,它使得分区 Leader 副本一直存在,不至于停止对外提供服务,因此提升了高可用性。)

对于一个包含3 个副本的主题分区,如果min.insync.replicas=2 ,那么至少要存在两个同步副本才能向分区写入数据。

如果进行了上面的配置,此时必须要保证ISR中至少存在两个副本,如果ISR中的副本个数小于2,那么Broker就会停止接受生产者的请求。尝试发送数据的生产者会收到NotEnoughReplicasException异常,消费者仍然可以继续读取已有的数据。

数据一致性

老的 Leader 还是新选举的 Leader,Consumer 都能读到一样的数据。那么 Kafka 是如何实现的呢

只有 High Water Mark 以上的消息才支持 Consumer 读取,而 High Water Mark 取决于 ISR 列表里面偏移量最小的分区,对应于上图的副本2,这个很类似于木桶原理。

这样做的原因是还没有被足够多副本复制的消息被认为是“不安全”的,如果 Leader 发生崩溃,另一个副本成为新 Leader,那么这些消息很可能丢失了。如果我们允许消费者读取这些消息,可能就会破坏一致性。试想,一个消费者从当前 Leader(副本0) 读取并处理了 Message4,这个时候 Leader 挂掉了,选举了副本1为新的 Leader,这时候另一个消费者再去从新的 Leader 读取消息,发现这个消息其实并不存在,这就导致了数据不一致性问题。

当然,引入了 High Water Mark 机制,会导致 Broker 间的消息复制因为某些原因变慢,那么消息到达消费者的时间也会随之变长(因为我们会先等待消息复制完毕)。延迟时间可以通过参数 replica.lag.time.max.ms 参数配置,它指定了副本在复制消息时可被允许的最大延迟时间。

Rebalance机制分析

Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 consumer 如何达成一致,来分配订阅 Topic 的每个分区。

例如:某 Group 下有 20 个 consumer 实例,它订阅了一个具有 100 个 partition 的 Topic 。正常情况下,kafka 会为每个 Consumer 平均的分配 5 个分区。这个分配的过程就是 Rebalance。

触发 Rebalance 的时机

  • 组成员个数发生变化。例如有新的 consumer 实例加入该消费组或者离开组。
  • 订阅的 Topic 个数发生变化。
  • 订阅 Topic 的分区数发生变化。

第一类非必要 Rebalance 是因为未能及时发送心跳,导致 Consumer 被 “踢出”Group 而引发的。这种情况下我们可以设置 session.timeout.ms 和 heartbeat.interval.ms 的值,来尽量避免rebalance的出现。(以下的配置是在网上找到的最佳实践,暂时还没测试过

  • 设置 session.timeout.ms = 6s。
  • 设置 heartbeat.interval.ms = 2s。
  • 要保证 Consumer 实例在被判定为 “dead” 之前,能够发送至少 3 轮的心跳请求,即 session.timeout.ms >= 3 * heartbeat.interval.ms。

第二类非必要 Rebalance 是 Consumer 消费时间过长导致的。此时,max.poll.interval.ms 参数值的设置显得尤为关键。如果要避免非预期的 Rebalance,你最好将该参数值设置得大一点,比你的下游最大处理时间稍长一点。

消息到达分区策略

  • 如果在发消息的时候指定了分区,则消息投递到指定的分区
  • 如果没有指定分区,但是消息的key不为空,则基于key的哈希值来选择一个分区
  • 如果既没有指定分区,且消息的key也是空,则用轮询的方式选择一个分区