Kafka生产者实践
Kafka是分布式、多副本、基于Zookeeper的分布式消息流平台。它是基于发布订阅模式的消息引擎系统。
Spring 整合 Kafka
引入 kafka:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
发送消息
在实际使用中,通过 KafkaTemplate
来进行生产者消息的发送。
private KafkaTemplate<String, Object> template;
public void sentMsg(){
this.template.send("topic", "msg");
}
除了可以简单指定 topic 和消息内容,KafkaTemplate
还可以指定分区位置、消息的 key
等参数更加灵活地发送消息。
异步回调
Kafka 发送消息是一个异步的操作,发送完消息就立刻返回,不需要让发消息这个动作占据太多的系统时间。但是对于消息是否发送成功还是要知道的,就需要为发送添加回调方法来处理发送结果。
template.send("","").addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {
@Override
public void onFailure(Throwable throwable) {
......
}
@Override
public void onSuccess(Object o) {
....
}
});
如何保证顺序性
kafka 仅在分区内保证有序性,所以要实现消息的有序消费,需要指定分区发送消息。默认的分区方式是轮询策略来发送。
指定 partition
简单粗暴的指定分区发送消息,指定发送到分区1就发送到分区1。
指定 key + DefaultPartitioner
更常用的做法是通过 key 来分区,这样的 key 一般是有业务含义的,比如用户 id,这样同一个用户的信息就会发送到同一个分区当中了。
可靠性保障
消息不丢失
先说说为什么会发送消息丢失。生产者丢数据,也就是发送的消息没有保存到 Broker 服务端上。导致这个的原因可能是网络抖动(消息没有发送)、消息本身内容不合法(消息体太大)导致 Broker 拒绝接收等等。
添加回调方法
要避免网络抖动,设置一个重试参数就可以解决。至于其他不可预知的消息丢失问题,最主要的就是建立回调反馈。消息丢失不可怕,但是可怕的是发送失败了不知道也没有做任何处理。简单的 send
方法通常都是异步发送,发送最后的结果是不知道的。所以要为 send
方法加上对应的 callback
,一旦出现消息提交失败的情况,我们就可以有针对性的进行处理(打错误日志等)。
发送消息到所有 Broker
我们知道 Broker 不会只有一个,多 Broker 就是为了高可用准备的。如果只满足于把消息发送到一台机器上就算成功,那么当这台机器损坏时,消息就必然会丢失。所以要想达到高可用的效果,就需要让所有的副本 Broker 也都要接收到消息,那么消息的提交才算是成功的。
要实现上述效果就要设置 acks 参数。为保证生产者发送的数据都能可靠地发送到指定的 Topic 上,Topic 的每个 partition 分区在收到数据后都要向生产者发送 ACK。只有在收到 ACK 后,生产者才会进行下一轮的发送,否则就会重新发送数据。
acks 参数指定了在集群中有多少个分区副本收到消息,生产者才会认定消息是被写入成功的。这个参数对消息丢失的可能性有很大影响。
- acks = 0:不会等待任何来自服务器的响应,也就是说如果发送消息时出现错误,Broker 没有收到消息,生产者是不知道的,消息也就丢失了。这种方式最不安全但是吞吐量最高。
- acks = 1:只要集群的 Leader 节点收到消息,生产者就会收到来自服务器成功的响应。这样可能会有问题,如果一个没有收到消息的节点后续成为新的 Leader 节点,消息就会丢失。
- acks = all/-1:只有在集群所有的副本 Broker 都接收到消息后,生产者才会收到一个来自服务器的成功响应,这种方式最安全但是延迟也是最高的。
因此,如果从保证消息不丢失的角度来开,acks 应该要配置成 all。
消息不重复
Kafka 默认情况下,提供的是至少一次的可靠性保障。即 Broker 保障已提交的消息的发送,但是遇上某些意外情况,如:网络抖动,超时等问题,导致生产者没有收到 Broker 返回的数据 ACK,则生产者会继续重试发送消息,从而导致消息重复发送。
相应的,如果我们禁止生产者的失败重试发送功能,或者说不用等待服务器响应(acks=0),消息要么写入成功,要么写入失败,但绝不会重复发送。这样就是最多一次的消息保障模式。
但对于消息组件,排除特殊业务场景,我们追求的一定是精确一次的消息保障模式。kafka通过幂等性(Idempotence)和事务(Transaction)的机制,提供了这种精确的消息保障。
幂等生产者
Producer 默认不是幂等性的,但我们可以创建幂等性 Producer。指定 Producer 幂等性的方法很简单,仅需要设置一个参数即可,即 props.put(“enable.idempotence”, ture)
,被设置成 true 后,Producer 自动升级成幂等性 Producer,其他所有的代码逻辑都不需要改变。
Kafka 自动帮你做消息的重复去重。底层具体的原理很简单,就是经典的用空间去换时间的优化思路,即在 Broker 端多保存一些字段。当 Producer 发送了具有相同字段值的消息后,Broker 能够自动知晓这些消息已经重复了,于是可以在后台默默地把它们“丢弃”掉。当然,实际的实现原理并没有这么简单,但你大致可以这么理解。
但你需要注意以下问题:
1、它只能保证单分区上的幂等性,即一个幂等性 Producer 能够保证某个主题的一个分区上不出现重复消息,无法实现多个分区的幂等性。
2、它只能实现单会话上的幂等性,不能实现跨会话的幂等性。这里的会话,你可以理解为 Producer 进程的一次运行。当你重启了 Producer 进程之后,这种幂等性保证就丧失了。
那么如果我想实现多分区以及多会话上的消息无重复,应该怎么做呢?答案就是事务(transaction)或者依赖事务型 Producer。这也是幂等性 Producer 和事务型 Producer 的最大区别!
事务生产者
Kafka 的事务跟我们常见数据库事务概念差不多,也是提供经典的 ACID,即原子性(Atomicity)、一致性 (Consistency)、隔离性 (Isolation) 和持久性 (Durability)。
事务 Producer 保证消息写入分区的原子性,即这批消息要么全部写入成功,要么全失败。此外,Producer 重启回来后,Kafka 依然保证它们发送消息的精确一次处理。事务特性的配置也很简单:
和幂等 Producer 一样,开启 enable.idempotence = true
,设置 Producer 端参数 transctional.id
事务 Producer 的代码稍微也有点不一样,需要调一些事务处理的 API。数据的发送需要放在 beginTransaction 和 commitTransaction 之间。示例代码:
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch (KafkaException e) {
producer.abortTransaction();
}
这段代码能够保证 Record1 和 Record2 被当作一个事务统一提交到 Kafka,要么它们全部提交成功,要么全部写入失败。实际上即使写入失败,Kafka 也会把它们写入到底层的日志中,也就是说 Consumer 还是会看到这些消息。
因此消费者的代码也需要加上 isolation.level
参数,用以处理事务提交的数据。 值为 read_committed
:表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息。当然了,它也能看到非事务型 Producer 写入的所有消息。
事务Producer虽然在多分区的数据处理上保证了幂等,但是处理性能上相应的是会有一些下降的。
两种生产者(同步和异步)
官方的 api 只提供了 send
方法,并没有明说是同步方法还是异步方法。但其实对于同步还是异步,官方是交给使用者来实现的。
异步的发送代码参考简单 Demo 异步无回调,以及保证消息不丢失中说的异步有回调。
那同步生产者怎么实现呢?我们看看 send 方法的返回是什么?Future!这是一个在并发编程中经常使用的对象。
Future 的核心思想是:一个方法 M(有返回值),在计算的过程中可能非常耗时,其他线程一直阻塞等待 M 的返回,这显然不可取。可以在调用 M 的时候,立马返回一个 Future,后续再通过 Future 这个数据结构去控制方法 M 的结果。这也是为什么叫做 Future 的意思吧,代表未来。
Future 中的 get 方法:该方法的行为取决于任务的状态(尚未开始、正在运行、已完成)。如果任务已经完成,那么 get 会立即返回或者抛出一个 Exception(如果任务执行过程中发生异常),如果任务没有完成,那么 get 将阻塞并直到任务完成,这是必须的。如果任务抛出了异常,那么 get 将该异常封装为 ExecutionException 并重新抛出。如果任务被中断,那么 get 将抛出 InterruptedException。
说白了,只要你调用 Futrue 对象的 get() 方法,假如没有结果就必须阻塞。直到客户端(生产者)收到服务器的响应。这就是同步的实现方式,但也不能一味死等,所以要设置一个时间参数,最大的等待时间。
SpringBoot+Kafka最佳实践
发送消息
SpringBoot 中向 Kafka 发送消息的对象是 KafkaTemplate
,其自带了多个 send
方法来发送消息。
# 向 topic 发送消息,同时提供消息的 key 和指定分区 → 发送给指定分区
ListenableFuture<SendResult<K,V>> send(String topic, Integer partition, K key, V data);
# 向 topic 发送消息,同时提供消息的 key,但不指定分区 → 根据 key 散列结果分发给不同分区
ListenableFuture<SendResult<K,V>> send(String topic, K key, V data);
# 向 topic 发送消息,不提供消息的 key,也不指定分区 → 轮询发送给不同分区
ListenableFuture<SendResult<K,V>> send(String topic, V data);
一个简单的生产者可以写为:
private KafkaTemplate<String, Object> template;
public void send(String message) {
this.template.send("topic1", message);
}
但这样写在发送完消息就结束了,我们并不知道消息是否发送成功,所以加上回调方法来执行发送完成后的操作。
public String send(String message) {
this.template.send(topic, message).addCallback(
success ->{
String topic = success.getRecordMetadata().topic();
int partition = success.getRecordMetadata().partition();
long offset = success.getRecordMetadata().offset();
System.out.println("topic:" + topic + " partition:"
+ partition + " offset:" + offset);
},
failure ->{
String message1 = failure.getMessage();
System.out.println(message1);
}
);
return "success";
}
回调方法还可以使用嵌套类来写,但是对于发送后的数据需要用强转来获取。
public String send(String message) {
this.template.send(topic, message).addCallback(new SuccessAction(), new FailureAction());
return "success";
}
class SuccessAction implement SuccessCallback {
@Override
public void onSuccess(Object o) {
RecordMetadata recordMetadata = (SendRedult<String, Object> o).getRecordMetadata();
String topic = recordMetadata.topic();
int partition = recordMetadata.partition();
long offset = recordMetadata.offset();
System.out.println("topic:" + topic + " partition:"
+ partition + " offset:" + offset);
}
}
class FailureAction implement FailureCallback {
@Override
public void onFailure(Throwabld e) {
log.error("消息发送失败: {}, {}", e.getMessage(), e)
}
}
根据 key 选择分区
Kafka 只保证了分区内消息的有序性,但是在分区之间并不保证消息是否有序。在默认情况下 Kafka 是轮询发送消息的,如果业务上有顺序处理的需求,可以指定消息的 key,kafka 会对 key 值进行 hash 计算,根据计算结果路由到指定分区,这种情况下可以保证同一个 key 的所有消息都进入到相同的分区。
public String send(String message) {
String key = "key";
this.template.send(topic,key, message).addCallback(
success ->{
String topic = success.getRecordMetadata().topic();
int partition = success.getRecordMetadata().partition();
long offset = success.getRecordMetadata().offset();
System.out.println("topic:" + topic + " partition:" + partition + " offset:" + offset);
},
failure ->{
String message1 = failure.getMessage();
System.out.println(message1);
}
);
return "success";
}
自定义分区策略
要实现自定义分区策略,只要实现Partitioner接口,重写它的方法就好了,用的不多,只做相关的摘录。
@Component
public class CustomizePartitioner implements Partitioner {
/**
* 自定义分区策略
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
// 获取topic的分区列表
List<PartitionInfo> partitionInfoList = cluster.availablePartitionsForTopic(topic);
int partitionCount = partitionInfoList.size();
int auditPartition = partitionCount - 1;
return auditPartition;
}
/**
* 在分区程序关闭时调用
*/
@Override
public void close() {
System.out.println("colse ...");
}
/**
* 做必要的初始化工作
*/
@Override
public void configure(Map<String, ?> configs) {
System.out.println("init ...");
}
}
定义完分区策略,需要配置下分区策略,在KafkaProducerConfig中producerConfigs方法里追加如下一行代码即可: props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.example.springbootkafka.config.CustomizePartitioner");
新增测试方法:
public String send(String message) {
kafkaTemplate.send(topic, message).addCallback(
success ->{
String topic = success.getRecordMetadata().topic();
int partition = success.getRecordMetadata().partition();
long offset = success.getRecordMetadata().offset();
System.out.println("topic:" + topic + " partition:" + partition + " offset:" + offset);
},
failure ->{
String message1 = failure.getMessage();
System.out.println(message1);
}
);
return "success";
}