顾乔芝士网

持续更新的前后端开发技术栈

kafka的消费过程分析(kafka消费情况)

1 消费模式

常见的消费模式有两种:

  • poll(拉):消费者主动向服务端拉取消息。
  • push(推):服务端主动推送消息给消费者。

由于推模式很难考虑到每个客户端不同的消费速率,导致消费者无法消费消息而宕机,因此kafka采用的是poll的模式,

优点:

  • 根据consumer的消费能力进行数据拉取,可以控制速率
  • 可以批量拉取,也可以单条
  • 可以设置不同的提交方式

该模式有个缺点,如果服务端没有消息,消费端就会一直空轮询。为了避免过多不必要的空轮询,kafka做了改进,如果没消息服务端就会暂时保持该请求,在一段时间内有消息再回应给客户端。

2 消费者消费规则

kafka是以消费者组进行消费,一个消费者组,由多个consumer组成,他们和topic的消费规则如下:

  • topic的一个分区只能被消费组中的一个消费者消费。
  • 消费者组中的一个消费者可以消费topic一个或者多个分区。

通过这种分组、分区的消费方式,可以提高消费者的吞吐量,同时也能够实现消息的发布/订阅模式和点对点两种模式。

3 消费者整体工作流程

消费者消费总体分为两个步骤,第一步是制定消费的方案,就是这个组下哪个消费者消费哪个分区,第二个是建立网络连接,获取消息数据。

一、制定消费方案

消费者中每个消费者到底消费哪一个主题分区,这个分配策略其实是由消费者的Leader决定的,这个Leader我们称为群主。群主是多个消费者中,第一个加入组中的消费者,其他的消费者我们称为Follower.

当消费者加入组的时候,会发送一个JoinGroup请求。群主负责给每一个消费者分配分区。

每个消费者只知道自己的分配的信息,只有群主知道组内所有消费者的分配信息。

指定分配策略的基本流程:

  1. 假设第一个消费者设定group.id为test,向当前负载最小的节点发送请求查找消费调度

coordinator节点选择= groupid的hashcode值 % 50( __consumer_offsets内置主题位移的分区数量)例如: groupid的hashcode值 为1,1% 50 = 1,那么__consumer_offsets 主题的1号分区,在哪个broker上,就选择这个节点的coordinator作为这个消费者组的调度器。消费者组下的所有的消费者提交offset的时候就往这个分区去提交`offset

  1. 找到消费调度器后, 消费者向调度器节点发出JoinGroup请求,加入消费者组
  1. 当前消费者当选为群主后,根据消费者配置中分配策略设计分区分配方案,并将分配好的方案告知调度器
  1. 接着第二个消费者设定group.id为test,申请加入消费者
  1. 加入成功之后,kafka将消费者组状态切换到准备rebalance,关闭和消费者的所有链接,等到它们重新加入,客户端重新申请加入,kafka从消费者组中挑选一个作为Leader,其他的作为Follower(步骤和之前相同)
  1. Leader会按照分配策略对分区进行重分配,并将方案发送给调度器,由调度器通知所有的成员新的分配方案。组员会按照新的方案重新消费数据

4 消费者消费细节

现在已经初始化消费者组信息,知道哪个消费者消费哪个分区,接着我们来看看消费者细节。

  1. 消费者创建一个网络连接客户端ConsumerNetworkClient, 发送消费请求,可以进行如下配置:
fetch.min.bytes: 每批次最小抓取大小,默认1字节
fetch.max.bytes: 每批次最大抓取大小,默认50M
fetch.max.wait.ms:最大超时时间,默认500ms
  1. 发送请求到kafka集群
  2. 获取数据成功,会将数据保存到completedFetches队列中
  3. 消费者从队列中抓取数据,根据配置max.poll.records一次拉取数据返回消息的最大条数,默认500条。
  4. 获取到数据后,经过反序列化器、拦截器后,得到最终的消息。
  5. 最后一步是提交保存消费的位移offset,也就是这个消费者消费到什么位置了,这样下次重启也可以继续从这个位置开始消费,关于offset的管理后面详细介绍。 提交的offset保存在__consumer_offsets

5 消费者分区策略

前面简单提到了消费者组初始化的时候会对分区进行分配,那么具体的分配策略是什么呢,也就是哪个消费者消费哪个分区数据?

kafka有四种主流的分区分配策略: RangeRoundRobinStickyCooperativeSticky。可以通过配置参数partition.assignment.strategy,修改分区的分配策略。默认策略是Range + CooperativeSticky。Kafka可以同时使用多个分区分配策略。

  1. Range分区策略
  • Range分区 是对每个 topic 而言的。对同一个 topic 里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。
  • 通过 partitions数/consumer数 来决定每个消费者应该消费几个分区。如果除不尽,那么前面几个消费者将会多消费 1 个分区。

如上图所示:有 7 个分区,3 个消费者,排序后的分区将会是0,1,2,3,4,5,6;消费者排序完之后将会是C0,C1,C2。7/3 = 2 余 1 ,除不尽,那么 消费者 C0 便会多消费 1 个分区。如果是8个分区, 8/3=2余2,除不尽,那么C0和C1分别多消费一个。

这种方式容易造成数据倾斜!如果有 N 多个 topic,那么针对每个 topic,消费者 C0都将多消费 1 个分区,topic越多,C0消费的分区会比其他消费者明显多消费 N 个分区。

  1. RoundRobin 分区策略

RoundRobin 针对集群中所有topic而言,RoundRobin 轮询分区策略,是把所有的 partition 和所有的consumer 都列出来,然后进行排序,最后通过轮询算法来分配 partition 给到各个消费者。

每个消费者组中的消费者都会有一个自动生产的UUID作为memberId,而消费者是按照memberId进行排序, 而主题是根据主题名称的hashCode进行排序的.

将主题分区轮询分配给对应的订阅用户,注意未订阅当前轮询主题的消费者会跳过.

从图中可以看出,轮询分配策略是存在缺点的,并不是那么均衡,如果testA-p2分配给消费者consumer2是不是就完美了

  1. Sticky 分区策略

Sticky是粘性的意思,它是从 0.11.x 版本开始引入这种分配策略,首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,在rebalance会尽量保持原有分配的分区不变化,这样可以节省开销。

在第一次分配后,每个组员都保留分配给自己的分区信息。如果有消费者加入或者退出,那么在进行分区再分配时(一般情况下,消费者退出45s后,才会进行再分配,因为需要考虑可能又恢复的情况),尽可能保证消费者原有的分区不变,重新对加入或者退出消费者的分区进行分配。

  1. Cooperative Sticky分区策略

前面三种分配策略再进行重分配时使用的是EAGER协议,会让当前的所有消费者放弃当前分区,关闭连接,资源清理,重新加入组和等待分配策略。明显效率是比较低的,所以从Kafka2.4版本开始,在粘性分配策略的基础上,优化了重分配的过程,使用的是COOPERATIVE协议,特点是在整个再分配的过程中,粘性分区分配策略分配的会更加均匀和高效,COOPERATIVE协议将一次全局重平衡改成每次小规模重平衡,直至最终平衡的过程。

Kafka默认的分区策略:Range分区策略 + Cooperative Sticky分区策略

6 消费者位移offset管理

消费者需要保存当前消费到分区的什么位置了,这样哪怕消费者故障,重启后也能继续消费,这就是消费者的维护offset管理。

1.offset 的默认维护位置

Kafka 0.9 版本之前consumer默认将offset保存在Zookeeper中,从0.9版本之后consumer默认保存在Kafka一个内置的topic中,该topic为_consumer_offsets。

消费者提交的offset值维护在**__consumer_offsets这个Topic中,具体维护在哪个分区中,是由消费者所在的消费者组groupid**决定,计算方式是:groupid的hashCode值对50取余。当kafka环境正常而消费者不能消费时,有可能是对应的__consumer_offsets分区leader为none或-1,或者分区中的日志文件损坏导致。

__consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是 group.id+topic+ 分区号,value 就是当前 offset 的值。每隔一段时间,kafka 内部会对这个 topic 进行 compact,也就是每个 group.id+topic+分区号就保留最新数据。

一般情况下, 当集群中第一次有消费者消费消息时会自动创建主题_ consumer_ offsets, 不过它的副本因子还受offsets.topic .replication.factor参数的约束,这个参数的默认值为3 (下载安装的包中此值可能为1),分区数可以通过
offsets.topic.num.partitions参数设置,默认为50。

在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false,默认是 true,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为 false。

cd /usr/local/kafka/kafka_2.13-3.8.1

bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server worker1:9092 --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning

打印的数据:

[c1,test,0]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1742721080869, expireTimestamp=None)
[c1,test,2]::OffsetAndMetadata(offset=20, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1742721080869, expireTimestamp=None)
[c1,test,1]::OffsetAndMetadata(offset=10, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1742721080869, expireTimestamp=None)
[c1,test,3]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1742721080869, expireTimestamp=None)

消费者提交offset的方式有两种,自动提交手动提交

2. 自动提交

为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交offset的功能。

相关的配置:

enable.auto.commit:是否开启自动提交offset功能,默认是true
auto.commit.interval.ms:自动提交offset的时间间隔,默认是5s

自动提交有可能出现消息消费失败,但是却提交了offset的情况,导致消息丢失。为了能够实现消息消费offset的精确控制,更推荐手动提交。

3. 手动提交

虽然自动提交offset十分简单便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因 此Kafka还提供了手动提交offset的API。手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。两者的相同点是,都会将本次提交的一批数据最高的偏移量提交;不同点是,同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故有可能提交失败。

//同步提交:必须等待offset提交完毕,再去消费下一批数据。 阻塞线程,一直到提交到成功,会进行失败重试
commitSync()
//异步提交: 发送完提交offset请求后,就开始消费下一批数据了。没有失败重试机制,会提交失败   
commitAsync()

7 消费者的核心参数配置

  • heartbeat.interval.ms: consumer心跳时间间隔,必须得与coordinator保持心跳才能知道consumer是否故障,如果consumer发生故障之后,就会通过心跳下发rebalance的指令给其他的consumer通知他们进行rebalance的操作. 默认值: 3秒
  • session.timeout.ms: kafka多长时间感知不到一个consumer就认为他故障了,默认是45秒
  • max.poll.interval.ms: 如果在两次poll操作之间,超过这个时间,那么就会认为这个consumer处理能力太弱了,会被踢出消费组,分区分配给别人去消费, 默认值: 5分钟
  • fetch.max.bytes: 获取一条消息最大的字节数, 默认值: 50M
  • max.poll.records: 一次poll返回消息的最大条数,默认值: 500
  • connections.max.idle.ms:consumer跟broker的socket连接如果空闲超过了一定时间,此时就会自动回收连接,但是下次消费就要重新建立socket连接,默认值: 10分钟。这个建议设置为-1,不要去回收
  • enable.auto.commit: 开启自动提交偏移量, 默认值: true
  • auto.commit.interval.ms: 每隔多久提交一次偏移量, 默认值: 5000毫秒
  • auto.offset.reset: 指定消费位置, 默认值: latest
    • earliest: 当各分区下有已提交的offset时,从提交的offset开始消费,无提交offset时,就从头开始消费
    • latest: 当各分区下有已提交的offset时,从提交的offset开始消费,无提交offset时,消费新生产的该分区下的数据
    • none: 当各分区下有已提交的offset时,从提交的offset开始消费,如果未找到消费者组的先前偏移量,则向消费者抛出异常
控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言