kafka生产者发送消息可靠性

首页 / 新闻资讯 / 正文

在实际业务场景下,为保证消息从producer能够准确无误的送达kafka的broker,kafka提供了针对消息ACK的几种级别,即broker对producer消息应答级别

kafka提供了3种ACK的应答机制

  • acks=0,生产者发送过来数据就不管了,可靠性较差,效率高;
  • acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等;
  • acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低;

在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据,对可靠性要求比较高的场景。

下面用代码演示下消息的生产段的ACK配置代码

import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord;  import java.util.Properties;  public class ProducerAck {      public static void main(String[] args) throws Exception {          // 1. 创建 kafka 生产者的配置对象         Properties properties = new Properties();         // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers         properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092");          properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");         properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");          // 设置 acks ACKS_CONFIG 可选择的值有: 0 1 all ,分别对应ACK的3种级别         properties.put(ProducerConfig.ACKS_CONFIG, "all");         // 重试次数 retries,默认是 int 最大值,2147483647         properties.put(ProducerConfig.RETRIES_CONFIG, 3);          // 3. 创建 kafka 生产者对象         KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);         System.out.println("开始发送数据");         // 4. 调用 send 方法,发送消息         for (int i = 0; i < 5; i++) {             kafkaProducer.send(new ProducerRecord<>("zcy222","congge " + i));         }         // 5. 关闭资源         kafkaProducer.close();     }   }

核心代码部分和之前正常发送消息没有太大区别,只需要在属性配置里面添加下面这句代码即可

properties.put(ProducerConfig.ACKS_CONFIG, "all");

同时,建议配合消息发送重试的配置一起使用,这个也是实际场景中常用的一种处理方式

运行上面的程序,消息可以正常发送到 ”zcy222“这个topic中