Kafka中的消费者通过Consumer API从broker接收消息。
接收消息的主要流程是:
- 创建消费者对象,指定broker地址和订阅主题:
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "group1");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("topic1"));
- 指定消费位置,从该位置开始消费:
consumer.seek(new TopicPartition("topic1", 0), 10);
- 消费消息,并在回调方法中处理:
consumer.poll(Duration.ofMillis(100))
.records(new ConsumerRecords<String, String>() {
@Override
public void forEach(ConsumerRecord<String, String> record) {
System.out.println(record.key() + " " + record.value());
}
});
- 定期提交消费进度offset:
consumer.commitSync();
- 关闭消费者对象:
consumer.close();
消费者主要通过subscribe方法订阅主题,poll方法消费消息,commitSync方法提交offset。
消费者会拉取订阅主题未消费的消息,并在poll方法的回调中处理。处理完成后需要提交offset,通知broker已消费到哪个位置。
消费者还需要指定组ID,具有相同组ID的消费者会负载消费主题中的消息,形成一个消费组。
以上就是Kafka消费者接收消息的主要流程和API。消费者可以轻松订阅消息并处理,Kafka保证消息的有序传输和exactly-once语义。