跳至主要內容

Kafka生产者实践

Milton原创大约 11 分钟开发后端

Kafka是分布式、多副本、基于Zookeeper的分布式消息流平台。它是基于发布订阅模式的消息引擎系统。

Spring 整合 Kafka

引入 kafka:

<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
</dependency>

发送消息

在实际使用中,通过 KafkaTemplate 来进行生产者消息的发送。

private KafkaTemplate<String, Object> template;

public void sentMsg(){
	this.template.send("topic", "msg");
}

除了可以简单指定 topic 和消息内容,KafkaTemplate 还可以指定分区位置、消息的 key 等参数更加灵活地发送消息。

异步回调

Kafka 发送消息是一个异步的操作,发送完消息就立刻返回,不需要让发消息这个动作占据太多的系统时间。但是对于消息是否发送成功还是要知道的,就需要为发送添加回调方法来处理发送结果。

template.send("","").addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {
	@Override
	public void onFailure(Throwable throwable) {
		......
	}

	@Override
	public void onSuccess(Object o) {
		....
	}
});

如何保证顺序性

kafka 仅在分区内保证有序性,所以要实现消息的有序消费,需要指定分区发送消息。默认的分区方式是轮询策略来发送。

指定 partition

简单粗暴的指定分区发送消息,指定发送到分区1就发送到分区1。

指定 key + DefaultPartitioner

更常用的做法是通过 key 来分区,这样的 key 一般是有业务含义的,比如用户 id,这样同一个用户的信息就会发送到同一个分区当中了。

可靠性保障

消息不丢失

先说说为什么会发送消息丢失。生产者丢数据,也就是发送的消息没有保存到 Broker 服务端上。导致这个的原因可能是网络抖动(消息没有发送)、消息本身内容不合法(消息体太大)导致 Broker 拒绝接收等等。

添加回调方法

要避免网络抖动,设置一个重试参数就可以解决。至于其他不可预知的消息丢失问题,最主要的就是建立回调反馈。消息丢失不可怕,但是可怕的是发送失败了不知道也没有做任何处理。简单的 send 方法通常都是异步发送,发送最后的结果是不知道的。所以要为 send 方法加上对应的 callback ,一旦出现消息提交失败的情况,我们就可以有针对性的进行处理(打错误日志等)。

发送消息到所有 Broker

我们知道 Broker 不会只有一个,多 Broker 就是为了高可用准备的。如果只满足于把消息发送到一台机器上就算成功,那么当这台机器损坏时,消息就必然会丢失。所以要想达到高可用的效果,就需要让所有的副本 Broker 也都要接收到消息,那么消息的提交才算是成功的。

要实现上述效果就要设置 acks 参数。为保证生产者发送的数据都能可靠地发送到指定的 Topic 上,Topic 的每个 partition 分区在收到数据后都要向生产者发送 ACK。只有在收到 ACK 后,生产者才会进行下一轮的发送,否则就会重新发送数据。

acks 参数指定了在集群中有多少个分区副本收到消息,生产者才会认定消息是被写入成功的。这个参数对消息丢失的可能性有很大影响。

  • acks = 0:不会等待任何来自服务器的响应,也就是说如果发送消息时出现错误,Broker 没有收到消息,生产者是不知道的,消息也就丢失了。这种方式最不安全但是吞吐量最高。
  • acks = 1:只要集群的 Leader 节点收到消息,生产者就会收到来自服务器成功的响应。这样可能会有问题,如果一个没有收到消息的节点后续成为新的 Leader 节点,消息就会丢失。
  • acks = all/-1:只有在集群所有的副本 Broker 都接收到消息后,生产者才会收到一个来自服务器的成功响应,这种方式最安全但是延迟也是最高的。

因此,如果从保证消息不丢失的角度来开,acks 应该要配置成 all。

消息不重复

Kafka 默认情况下,提供的是至少一次的可靠性保障。即 Broker 保障已提交的消息的发送,但是遇上某些意外情况,如:网络抖动,超时等问题,导致生产者没有收到 Broker 返回的数据 ACK,则生产者会继续重试发送消息,从而导致消息重复发送。

相应的,如果我们禁止生产者的失败重试发送功能,或者说不用等待服务器响应(acks=0),消息要么写入成功,要么写入失败,但绝不会重复发送。这样就是最多一次的消息保障模式。

但对于消息组件,排除特殊业务场景,我们追求的一定是精确一次的消息保障模式。kafka通过幂等性(Idempotence)和事务(Transaction)的机制,提供了这种精确的消息保障。

幂等生产者

Producer 默认不是幂等性的,但我们可以创建幂等性 Producer。指定 Producer 幂等性的方法很简单,仅需要设置一个参数即可,即 props.put(“enable.idempotence”, ture) ,被设置成 true 后,Producer 自动升级成幂等性 Producer,其他所有的代码逻辑都不需要改变。

Kafka 自动帮你做消息的重复去重。底层具体的原理很简单,就是经典的用空间去换时间的优化思路,即在 Broker 端多保存一些字段。当 Producer 发送了具有相同字段值的消息后,Broker 能够自动知晓这些消息已经重复了,于是可以在后台默默地把它们“丢弃”掉。当然,实际的实现原理并没有这么简单,但你大致可以这么理解。

但你需要注意以下问题:

1、它只能保证单分区上的幂等性,即一个幂等性 Producer 能够保证某个主题的一个分区上不出现重复消息,无法实现多个分区的幂等性。

2、它只能实现单会话上的幂等性,不能实现跨会话的幂等性。这里的会话,你可以理解为 Producer 进程的一次运行。当你重启了 Producer 进程之后,这种幂等性保证就丧失了。

那么如果我想实现多分区以及多会话上的消息无重复,应该怎么做呢?答案就是事务(transaction)或者依赖事务型 Producer。这也是幂等性 Producer 和事务型 Producer 的最大区别!

事务生产者

Kafka 的事务跟我们常见数据库事务概念差不多,也是提供经典的 ACID,即原子性(Atomicity)、一致性 (Consistency)、隔离性 (Isolation) 和持久性 (Durability)。

事务 Producer 保证消息写入分区的原子性,即这批消息要么全部写入成功,要么全失败。此外,Producer 重启回来后,Kafka 依然保证它们发送消息的精确一次处理。事务特性的配置也很简单:

和幂等 Producer 一样,开启 enable.idempotence = true ,设置 Producer 端参数 transctional.id

事务 Producer 的代码稍微也有点不一样,需要调一些事务处理的 API。数据的发送需要放在 beginTransaction 和 commitTransaction 之间。示例代码:

producer.initTransactions();
try {
     producer.beginTransaction();
     producer.send(record1);
     producer.send(record2);
     producer.commitTransaction();
} catch (KafkaException e) {
     producer.abortTransaction();
}

这段代码能够保证 Record1 和 Record2 被当作一个事务统一提交到 Kafka,要么它们全部提交成功,要么全部写入失败。实际上即使写入失败,Kafka 也会把它们写入到底层的日志中,也就是说 Consumer 还是会看到这些消息。

因此消费者的代码也需要加上 isolation.level 参数,用以处理事务提交的数据。 值为 read_committed:表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息。当然了,它也能看到非事务型 Producer 写入的所有消息。

事务Producer虽然在多分区的数据处理上保证了幂等,但是处理性能上相应的是会有一些下降的。

两种生产者(同步和异步)

官方的 api 只提供了 send 方法,并没有明说是同步方法还是异步方法。但其实对于同步还是异步,官方是交给使用者来实现的。

异步的发送代码参考简单 Demo 异步无回调,以及保证消息不丢失中说的异步有回调。

那同步生产者怎么实现呢?我们看看 send 方法的返回是什么?Future!这是一个在并发编程中经常使用的对象。

Future 的核心思想是:一个方法 M(有返回值),在计算的过程中可能非常耗时,其他线程一直阻塞等待 M 的返回,这显然不可取。可以在调用 M 的时候,立马返回一个 Future,后续再通过 Future 这个数据结构去控制方法 M 的结果。这也是为什么叫做 Future 的意思吧,代表未来。

Future 中的 get 方法:该方法的行为取决于任务的状态(尚未开始、正在运行、已完成)。如果任务已经完成,那么 get 会立即返回或者抛出一个 Exception(如果任务执行过程中发生异常),如果任务没有完成,那么 get 将阻塞并直到任务完成,这是必须的。如果任务抛出了异常,那么 get 将该异常封装为 ExecutionException 并重新抛出。如果任务被中断,那么 get 将抛出 InterruptedException。

说白了,只要你调用 Futrue 对象的 get() 方法,假如没有结果就必须阻塞。直到客户端(生产者)收到服务器的响应。这就是同步的实现方式,但也不能一味死等,所以要设置一个时间参数,最大的等待时间。

SpringBoot+Kafka最佳实践

发送消息

SpringBoot 中向 Kafka 发送消息的对象是 KafkaTemplate ,其自带了多个 send 方法来发送消息。


# 向 topic 发送消息,同时提供消息的 key 和指定分区  →  发送给指定分区
ListenableFuture<SendResult<K,V>> send(String topic, Integer partition, K key, V data);

# 向 topic 发送消息,同时提供消息的 key,但不指定分区  →  根据 key 散列结果分发给不同分区
ListenableFuture<SendResult<K,V>> send(String topic, K key, V data);

# 向 topic 发送消息,不提供消息的 key,也不指定分区 → 轮询发送给不同分区
ListenableFuture<SendResult<K,V>> send(String topic, V data);

一个简单的生产者可以写为:


private KafkaTemplate<String, Object> template;

public void send(String message) {
	this.template.send("topic1", message);
}

但这样写在发送完消息就结束了,我们并不知道消息是否发送成功,所以加上回调方法来执行发送完成后的操作。


public String send(String message) {
	this.template.send(topic, message).addCallback(
		success ->{
			String topic = success.getRecordMetadata().topic();
			int partition = success.getRecordMetadata().partition();
			long offset = success.getRecordMetadata().offset();
			System.out.println("topic:" + topic + " partition:" 
				+ partition + " offset:" + offset);
		},
		failure ->{
			String message1 = failure.getMessage();
			System.out.println(message1);
			}
);
	return "success";
}

回调方法还可以使用嵌套类来写,但是对于发送后的数据需要用强转来获取。

public String send(String message) {
	this.template.send(topic, message).addCallback(new SuccessAction(), new FailureAction());

	return "success";
}

class SuccessAction implement SuccessCallback {
	@Override
	public void onSuccess(Object o) {
		RecordMetadata recordMetadata = (SendRedult<String, Object> o).getRecordMetadata();
		String topic = recordMetadata.topic();
			int partition = recordMetadata.partition();
			long offset = recordMetadata.offset();
			System.out.println("topic:" + topic + " partition:" 
				+ partition + " offset:" + offset);
	}
}

class FailureAction implement FailureCallback {
	@Override
	public void onFailure(Throwabld e) {
		log.error("消息发送失败: {}, {}", e.getMessage(), e)
	}
}

根据 key 选择分区

Kafka 只保证了分区内消息的有序性,但是在分区之间并不保证消息是否有序。在默认情况下 Kafka 是轮询发送消息的,如果业务上有顺序处理的需求,可以指定消息的 key,kafka 会对 key 值进行 hash 计算,根据计算结果路由到指定分区,这种情况下可以保证同一个 key 的所有消息都进入到相同的分区。


public String send(String message) {
	String key = "key";
	this.template.send(topic,key, message).addCallback(
		success ->{
			String topic = success.getRecordMetadata().topic();
			int partition = success.getRecordMetadata().partition();
			long offset = success.getRecordMetadata().offset();
			System.out.println("topic:" + topic + " partition:" + partition + " offset:" + offset);
			},
			failure ->{
				String message1 = failure.getMessage();
				System.out.println(message1);
			}
	);
	return "success";
}

自定义分区策略

要实现自定义分区策略,只要实现Partitioner接口,重写它的方法就好了,用的不多,只做相关的摘录。

@Component
public class CustomizePartitioner implements Partitioner {
	/**
     * 自定义分区策略
     */
	@Override
	public int partition(String topic, Object key, byte[] keyBytes, 
	          Object value, byte[] valueBytes, Cluster cluster) {
        // 获取topic的分区列表
        List<PartitionInfo> partitionInfoList = cluster.availablePartitionsForTopic(topic);

        int partitionCount = partitionInfoList.size();
        int auditPartition = partitionCount - 1;
        return auditPartition;
    }

    /**
     * 在分区程序关闭时调用
     */
    @Override
    public void close() {
	    System.out.println("colse ...");
	}
	
    /**
     * 做必要的初始化工作
     */
    @Override
    public void configure(Map<String, ?> configs) {
        System.out.println("init ...");
    }
}

定义完分区策略,需要配置下分区策略,在KafkaProducerConfig中producerConfigs方法里追加如下一行代码即可: props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.example.springbootkafka.config.CustomizePartitioner");

新增测试方法:


public String send(String message) {
	kafkaTemplate.send(topic, message).addCallback(
		success ->{
			String topic = success.getRecordMetadata().topic();
			int partition = success.getRecordMetadata().partition();
			long offset = success.getRecordMetadata().offset();
			System.out.println("topic:" + topic + " partition:" + partition + " offset:" + offset);
		},
		failure ->{
			String message1 = failure.getMessage();
			System.out.println(message1);
		}
	);
	
	return "success";
}

参考

  1. 别网上找个demo就以为掌握了Kafka生产者open in new window
  2. SpringBoot整合Kafka消息队列(生产者和消费者)open in new window
  3. springboot集成整合kafka-自定义分区策略、将消息发送到指定的分区partitionopen in new window