RocketMQ支持消息的拉取模式和推送模式消费,两种模式的主要区别如下:
- 拉取模式:消费者主动拉取Broker上的消息。消费者按照自己的消费速度拉取消息,Broker不主动推送消息给消费者。
- 推送模式:Broker主动将消息推送给消费者。Broker会根据消费者的消费速度,推送适量的消息给消费者。
- 拉取模式的延迟一般较高,但稳定性较高。推送模式的延迟较低但可能造成消息堆积。
- 消费进度由消费者维护(拉取模式)或由Broker维护(推送模式),容易造成死信或重复消费。
RocketMQ同时支持两种模式以满足不同场景的需求。实现代码示例:
// 拉取模式消费
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup("ConsumerGroup1");
pullRequest.setTopic("TopicTest");
pullRequest.setQueueId(0);
PullResult pullResult = pullConsumer.pull(pullRequest);
List<MessageExt> msgList = pullResult.getMsgFoundList();
// 推送模式消费
MessageQueueListener listener = new MessageQueueListener() {
public ConsumeConcurrentlyStatus
consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// 消费消息
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
};
pushConsumer.registerMessageListener(listener);
pushConsumer.subscribe("TopicTest", "*");
pushConsumer.start(); // 启动消费