Kafka中的生产者通过Producer API向broker发送消息。发送消息的主要流程是:
- 创建生产者对象,指定broker地址:
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
Producer<String, String> producer = new KafkaProducer<>(props);
- 构建消息对象,指定主题和分区键:
ProducerRecord<String, String> record =
new ProducerRecord<>("topic1", "key1", "value1");
- 发送消息,异步接收回调方法:
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
e.printStackTrace();
} else {
System.out.println(metadata.topic() + " " + metadata.partition());
}
}
});
- 关闭生产者对象:
producer.close();
发送消息的主要API是send方法,它接收ProducerRecord对象和回调方法。回调方法会在消息发送完成时被调用,返回消息的元数据或异常信息。
KafkaProducer有同步和异步发送API。异步API性能更高,并且在消息发送失败时自动重试,更适合高吞吐的生产环境。
生产者在发送消息时,会根据分区键或Round Robin算法选择分区。如果没有指定分区键,生产者会选取当前可用分区最少的分区发送。
生产者发送的消息会追加到所选分区的log文件末尾,并被分配连续的offset。offset是消息在分区中的位置标识。
以上就是Kafka生产者发送消息的主要流程和API,生产者通过简单的API可以轻松将消息发布到Kafka,并由Kafka提供持久化保证和高可靠消息传输。