在实际业务场景下,为保证消息从producer能够准确无误的送达kafka的broker,kafka提供了针对消息ACK的几种级别,即broker对producer消息应答级别
kafka提供了3种ACK的应答机制
在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据,对可靠性要求比较高的场景。
下面用代码演示下消息的生产段的ACK配置代码
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class ProducerAck { public static void main(String[] args) throws Exception { // 1. 创建 kafka 生产者的配置对象 Properties properties = new Properties(); // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // 设置 acks ACKS_CONFIG 可选择的值有: 0 1 all ,分别对应ACK的3种级别 properties.put(ProducerConfig.ACKS_CONFIG, "all"); // 重试次数 retries,默认是 int 最大值,2147483647 properties.put(ProducerConfig.RETRIES_CONFIG, 3); // 3. 创建 kafka 生产者对象 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties); System.out.println("开始发送数据"); // 4. 调用 send 方法,发送消息 for (int i = 0; i < 5; i++) { kafkaProducer.send(new ProducerRecord<>("zcy222","congge " + i)); } // 5. 关闭资源 kafkaProducer.close(); } }
核心代码部分和之前正常发送消息没有太大区别,只需要在属性配置里面添加下面这句代码即可
properties.put(ProducerConfig.ACKS_CONFIG, "all");
同时,建议配合消息发送重试的配置一起使用,这个也是实际场景中常用的一种处理方式
运行上面的程序,消息可以正常发送到 ”zcy222“这个topic中