Kafka消费者消费消息(高级API)

1. 介绍

Kafka消费者有三种API,Scala实现的两种消费者API 叫做高级API和低级API,新版本的消费者采用Java重新实现叫做 新API。 但不管采用什么版本实现,消费者消费消息的主要工作没有太大变化 , 比如为消费者分配分区、拉取线程拉取消息客户端消费消息、更新拉取状态、提交偏移量 。 本文介绍内容主要是高级API

消费者拉取钱程拉取每个分区的数据,会将分区的消息集包装成一个数据块,放入分区信息的队列中 。 而每个队列都对应一个消息流,消费者客户端选代消息流,实际上是迭代每个数据块中消息集的每条消息 。
如图所示,一个队列包含多个数据块,每个数据块对应一个分区的消息集, 一个消息集包含
多条消息 。 消费者迭代器封装了迭代获取消息的逻辑,客户端不需要面向数据
块、消息集这些内部对象,只需要对消费者迭代器循环获取消息即可 。

kafka消费者消费流程

二、消费者迭代消费消息

​ 消费者迭代器生成包含消息的迭代器,首先弹出队列的每个数据块,然后获取数据块对应的消息集,最后迭代消息集中的每条消息 。 客户端迭代的消息是队列的所有数据块,而不是一个数据块。 所以在迭代过程中,要确保读取完一个数据块后,接着读取下一个数据块。 也就是说,消费者迭代器是:所有数据块通过消息集组成的消息迭代器 。

​ 消费者迭代器实现了 Java的 Iterator接口,必须重载hasNext ()和 next()方法。 hasNext ()方法会用来判断迭代器是杏结束, next ()方法每调用一次就指向迭代器的下一个元素 。 迭代的过程因为最上层数据结构是包含数据块的阻塞队列,所以从队列中弹出一个数据块就已经足够调用很多次next()了 。只有当前数据块的消息集都遍历完成后,才会从队列中弹出新的数据块

​ 在迭代的过程中,可能 多 次调用 next()方法都还是在同一数据块的同一个消息集 中, 所以迭代器妥保存当前的数据块、当前的 消息集。 如果当前消息 集没有下一个元素,则 需要同时更新这两个变量 。 因为一个数据块对应一个消息集 ,一旦当前消息集没有元素 了, 说明 这个数据块也.已经迭代完毕。

三、偏移量

3.1 两种偏移量

消费者的“拉取线程” 拉取消息后会更新“拉取状态 ” ,对应的 “消费线程”获取消息后也要更新相关的“消费状态” 。 (准确地说,消费消息的对象是一个迭代器而不是钱程。 这里为了和拉取线程相对应,故叫作消费线程。 拉取状态对应分区信息、对象的拉取偏移量 , 表示消费者已经拉取的分区位置 ; 消费状态对应了消费偏移量,表示消费者已经消费完成的偏移量 。
如 图 3-23所示 , 拉取消息的线程和消费消息 的线程是两个独立的工作模块,前者通过分区信息对象的阻塞队列将消息传给消费消息的线程完成数据的传输 。 消息拉取后只有被消费线程真正消后,才会更新消费状态 。 也就是说,“拉取线程更新拉取偏移量,消费线程更新消费偏移量”,

具体步骤如下 :
(1) 消费者 的拉取线程从服务端拉取分区 的消息 。
(2)拉取到分区消息后 ,就更新分区信息对象的拉取偏移量 。
(3)将分区数据的消 息集封装成数据块 。
(4) 客户端循环迭代数据块的消息集 。
(5) 消费完一条消息后,就更新分区信息对象的消费偏移量
(6)消息流中的每一条消息返回给消费者客户端应用程序 。

拉取线程和消费线程分别更新分区信息的状态

3.2 提交偏移量

在旧版本中每个分区的偏移量都保存到ZK中 ,分区信息的拉取偏移初始时从ZK读取,然后在拉取消息后更新 。 同样,消费偏移盘初始时也是从ZK读取,然后在消费消息后更新。 消费者消费了新的消息后,还应该及时地将消费进度(即分区信息的消费偏移盘)保存到ZK中 。 每个分区都要和ZK产生一次交互,况且还要周期性地写人,这对ZK来说是个不小的负担。 在新版本中把偏移量像普通消息一样写入Kafka集群的内部主题。 而且正像消息会源源不断地写到集群一样,记录偏移量也是周期性的 。 Kafka支持高吞吐量的消息写入,对于偏移量的记录当然也不在话下 。

消费者提交偏移量是为了保存分区的消费进度 。 因为Kafka保证同一个分区只会分配给消费组中的唯一消费者,所以即使发生再平衡后,分区和消费者的所有权关系发生变化,新消费者也可以接着上一个消费者记录的偏移盘位置继续消费消息 。
但是消费者即使记录了分区的偏移量,仍然无法解决消息被重复消费的问题。 例如,消费者每隔 10秒提交一次偏移量,在 10秒时提交的偏移量是 100 ,下一次提交的可点是20秒。 在20秒之前,消费者又消费了 30条消息,然后消费者突然挂掉了 。 由于偏移量现在仍然停留在 10。这个位置,因此新的消费者2也只会从 10。这个位置继续消费,从而会重复处理偏移量为 100之后的30条消息 。通常消息被重复处理是可以接受的,至少不会出现消息丢失这种不可接受的问题。 定时提交偏移量的周期时间越长,消息被重复消费的数据量就越多。 客户端可以将这个周期时间设置得更短,来减少重复消费的消息量。 当然也不能太短,否则会导致客户端和保存偏移量的存储系统产生大量的网请求 。

消费者提交偏移量到Kafka的 内部主题,首先要确定连接哪个或者哪些服务端节点 。 回顾一下 ,
生产者发送消息时会根据分区的主副本分组 , 和多个节点者建立连接 ; 消费者分配多个分区 ,也要根
据分区的主副本分组, 和多个节点建立连接。 而消费者提交所有分区 的偏移量时, 实际上只和-个服
务端节点建立连接。 同样要处理多个分区,为什么普通消息需要多个连接,而偏移量只需要一个连接?
如图 3-24所示,目标节点指的是分区的主副本节点,我们给 出 了偏移量的多种连接方案 。
(1)如果不同分区的偏移盐写到了不同的节点,消费者分配了多个分区,当要读取不同分区的偏
移盘时,就得连接不同的节点才可以获得完整的数据。
(2)如果能让所有分区的偏移草’数据只保存在一个节点,消费者就只需要 同一个节点通信 。 但因
为消费者和分区的关系是变化的, 即使保证这一次分区在一个节点上, 也无法保证下一次仍然在同一
个节点 。
(3)如果消费组所有消费者所有分区的偏移量都保存在一个节点,就可以解决第二种方式的 问题 。

实际上 , 消费者 的分区偏移盏’要保存在哪个节点,跟消费者所属 的消费组有关系 。 只要保证
消费组级别的偏移i量在一个节点上, 即使消费者和分区的关系发生变化 , 也能够保证消费者访问新分
配的分区时 , 只需要访问一个节点 。

同一个消费组的所有消费者,以内部主题形式提交所有分区的偏移盘到一个目标节点,这个内部主题和普通消息的主题一样也会有多个分区。 如果只有一个分区 ,所有消费组都只能提交到唯一的节点,将所有读写请求都压到一个节点的相同问题。 而如果有多个分区,并且以消费组作为分区的分布条件,不同消费组提交到的偏移量有可能是不同的节点,就可以分散偏移盘读写的压力 。

总结下客户端需要确定服务端节点的几个场景。

  • 生产者发送消息时,直接在客户端决定消息要发送给哪个分区,这一步不向服务端发送请求 。
  • 消费者拉取管理器的线程向服务端发送主题元数据请求,获取包含了主副本等信息的所有分区元数据,消费者拉取线程才能确定要连接哪些服务端节点 。
  • 提交偏移量虽然有点像生产者的发送消息,都是写数据,但也需要和消费者一样,获取分区的主副本作为偏移管理器,才能确定提交到哪个节点 。

生产者、消费者、偏移量和Kafka集群的网络连接

四、连接偏移管理器

前面我们分析的拉取偏移方法和提交偏移量方法,都需要和偏移主-管理器通信。 在这之前,消费者需要通过 channelToOffsetManage ()方法向服务端任意一个节点发送“消费组的协调者请求”,来获取消费组对应的协调节点 ,即偏移量管理器节点 。服务端处理消费组的协调者请求,实际上也是通过查询主题的元数据来完成的 。 不过和 中返回主题元数据,然后还要在客户端继续处理( 比如获取存在主副本的分区)不同,这里在服务端完成“选择消费组对应内部主题的分区的主副本节点”,然后直接返回这个协调节点给客户端。 也就是说客户端发送消费组的协调者请求,服务端返回的就是消费组的协调节点 。

五、服务端处理提交偏移量的请求

协调节点会将消费者的偏移量提交请GroupCoordlnator类的 handleCommitOffsets ()方法处理,其中参数offsetMetadata表示分配给消费者的所有分区消费进度 。

写入偏移消息会调用 RepllcaManager .appendMessages ()方法,将消息集追加到本地日志文件,
并且会把分区和对应的偏移量保存在协调节点的缓存中 。 目的是:再平衡后如果其他消费者需要读取
分区的偏移毡 , 在连接上协调节点后,可以直接读取缓存 , 而不需要从日志文件中读取。

六、再平衡和分区分配

使用高级APl ,每个消费者进程启动时都会创建一个消费者连接器,并在ZK中注册消费者成员变
化、分区变化的监昕器。三旦监昕器注册的事件被触发,就会调用ZKRebalanceListener的再平衡方
法,为消费组的所有消费者重新分配分区 。 为了保证整个消费组分区分配算法的一致性,当一个消费
者触发再平衡时,该消费组内的所有消费者会同时触发再平衡。 如图(左)所示,第一个消费者
加入消费组触发再平衡,这时消费组只有一个消费者,所有的分区都分配给第一个消费者 。 如图
(右)所示,第二个消费者力n人同一个消费组,会触发所有消费者的再平衡,即第一个消费者和第二
个消费者都会再平衡。

消费组的消费者再平衡

由于每个消费者的再平衡都是独立的进程,消费者之间并不知道其他消费者的再平衡最后有没有
成功。 可能有些消费者的再平衡成功了,有些却失败了,就会导致本来分配给这个消费者的分区,因为
再平衡失败而无法被消费,但是其他消费者又都无法知 l斑。 解决这个问题的方法是:在服务端为每个消
费组都选举一个协调节点,让它负责某个消费组中所有消费者的协调了作。 另外,消费者提交分区的偏
移i卫;也是写到协调节点上的 。 实际上,消费者客户端发送给服务端的请求“只要和消费组相关,都会被
协调节点处理” 。 如图所示,消费者执行再平衡和提交偏移ill都直接和协调者交互,具体步骤如下。
(I )每个消费者触发再平衡时都和协调者联系,由协调者执行全局的分区分配 。
(2)协调者分配完成后,将分区分配给每个消费者 。
(3)每个消费者收到任务列表后,启动拉取钱程,拉取对应分区的消息,并更新拉取状态 。
(4)消费者周期性提交分区的偏移量给协调者,协调者将分区偏移写到内部主题 。

消费者消费分区和提交偏移量都经过协调者