Kafka中的生产者是如何发送消息的?

Kafka中的生产者通过Producer API向broker发送消息。发送消息的主要流程是:

  1. 创建生产者对象,指定broker地址:
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092"); 
Producer<String, String> producer = new KafkaProducer<>(props);
  1. 构建消息对象,指定主题和分区键:
ProducerRecord<String, String> record = 
    new ProducerRecord<>("topic1", "key1", "value1");
  1. 发送消息,异步接收回调方法:
producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception e) {
        if (e != null) {
            e.printStackTrace();
        } else {
            System.out.println(metadata.topic() + " " + metadata.partition());
        }
    }
});
  1. 关闭生产者对象:
producer.close();

发送消息的主要API是send方法,它接收ProducerRecord对象和回调方法。回调方法会在消息发送完成时被调用,返回消息的元数据或异常信息。

KafkaProducer有同步和异步发送API。异步API性能更高,并且在消息发送失败时自动重试,更适合高吞吐的生产环境。

生产者在发送消息时,会根据分区键或Round Robin算法选择分区。如果没有指定分区键,生产者会选取当前可用分区最少的分区发送。

生产者发送的消息会追加到所选分区的log文件末尾,并被分配连续的offset。offset是消息在分区中的位置标识。

以上就是Kafka生产者发送消息的主要流程和API,生产者通过简单的API可以轻松将消息发布到Kafka,并由Kafka提供持久化保证和高可靠消息传输。