Kafka消费者实践
Kafka是分布式、多副本、基于Zookeeper的分布式消息流平台。它是基于发布订阅模式的消息引擎系统。
springboot 对 kafka 做了集成,消费消息只需要配置好参数,使用 @KafkaListener
注解即可。
kafka 的消费过程很简单,收到消息 → 处理业务 → 提交位移。
消费的两个重要原则
消息不丢失
对于消息队列的中的每一条消息,都要做到不会丢失的标准。kafka 是通过位移来确认消费者消费到分区的哪一条消息,一般消息丢失发生的场景是在自动提交位移或是批量提交位移的场景下,消费还没结束,但是位移已经提交了,同时消费者发生故障重启了,这条消息在重启后就不会被重新消费,被丢失了。
要保证消费者不丢失消息,最重要的就是保证位移的准确性,最常用的做法就是在消息消费完成后再手动提交位移。
消息不重复
即使采用了手动提交位移也不能保证消息不重复的问题。假设处理完业务逻辑,准备提交位移时程序挂了,导致消费位移没有提交,重启之后就会重新消费。所以对于 kafka,业务层面要保证的是幂等性,这个是由我们自己去处理的。
我们要做的就是从业务逻辑设计上入手,将消费的业务逻辑设计成具备幂等性的操作。
如果业务不能设计成天然幂等性的话,可以考虑一种最为通用的幂等方式:自己记录并检查操作是否执行过。在消费消息时,检查是否已经消费过,如果没有才进入业务逻辑进行处理。在分布式的系统中,这样做会涉及到全局唯一 id、原子化操作,比较容易出问题。
手动提交消费位移
我们的系统中位移提交采用的是手动提交,这样的方式能够很好避免消息丢失的问题,如果是自动提交,在没处理完业务就把位移提交了,但同时服务崩了,重启后这条消息就不会重新消费,这条消息就丢失了。另外,如果是批量提交位移的话,手动提交并不能避免重复消费的问题,比如消费0~100的消息,消费到50条的时候业务处理出现异常导致消费失败,但是位移没有提交,下次消费还是从0开始。
对于业务层面的逻辑失败导致消费未消费成功 ,kafka 是无法处理的,不论重复消费多少次也是同样的结果,这时候就需要业务层自己处理异常,把对应的错误日志打好并把位移提交了。
未提交位移的影响
如果不提交位移,会发生什么?
- 首先可以明确的是,消费者并不会重复消费未提交位移的消息;
- 当消费者重启后,就会重新消费之前没有提交位移的消息;
- 如果后续消息消费后成功提交位移了,那么服务端就会更新到最新的位移,不会重新消费,就算消费者重启也不会重新消费;
相当于说,在消费者和服务器两边各自维护了一份位移记录,消费者在消费消息后自己记录了一个位移,并提交给服务器,更新分区上的位移记录。所以,当消费者重启时就会丢失自己保存的那份位移记录,重启后就以服务器上最新的位移作为自己消费位移的基准。
重平衡 Rebanlance
kafka 一大特点就是重平衡,能够在消费者组中有消费者实例新增/挂掉时,重新把分区分配给所有有效的消费者,保障了消息队列的高可用性。在重平衡的过程中,所有消费者实例共同参与,在协调者(coordinator)的帮助下,完成订阅主题分区的分配。
coordinator:专门为消费组服务,负责为 group 执行重平衡以及提供位移管理和组成员管理。
为什么要在消费者实践中提一下重平衡呢?是因为我发现在消息消费的过程中时,如果业务处理时间太长的话,是可能导致当前消费者被踢出消费组,重新分配分区的情况。而重平衡的过程是所有消费者停下手头所有的工作共同参与,如果数量过多的话会引起整体消费者有短时间无法消费任何消息,什么都处理不了。
引发重平衡的条件
- 消费组内成员数量的变动
- 订阅主题数发生变动
- 订阅主题的分区数发生变动
后两个条件是用户主动操作引起的,无法避免,在生产中应该重点关注第一个条件,消费者新增也是提高系统吞吐的要求,所以主要关注消费者减少的情况
消费者减少的情况
- 消费者没有及时发送心跳,导致 coordinator 误认为其已经失效。 需要合理设置 session.timeout.ms 和 heartbeat.interval.ms 参数。
- 消费时间过长。 消费者使用 max.poll.interval.ms 参数控制消费者实际消费能力对重平衡的影响。如果超过这个设定值还没有消费完 poll 方法返回的消息,消费者就会主动发起“离开组”的请求,coordinator 也会重新进行 rebanlance。设置批量消费的大小也会影响这部分,参数是 max.poll.records,如果一次性 poll 的消费数量过多,也会导致一次 poll 的处理无法在指定时间完成。
- 消费者内存泄漏 频繁的 Full GC 导致长时间的停顿,引发重平衡。