RocketMQ的消息重试机制主要在Push Consumer模式下使用,目的是保证消息能被消费者正常消费。
消息重试主要通过Broker端的定期消息重推和Consumer端的消息消费重试两部分实现。
Broker端消息重推实现主要在org.apache.rocketmq.broker.processor.ConsumeMessageProcessor类中:
public void run() {
while (!this.isStopped() && this.brokerController.getBrokerConfig().isBrokerConsumeEnable()) {
try {
// 遍历所有订阅关系并重试
for (SubscriptionData subscriptionData : this.brokerController.getConsumerManager().getAllSubscriptionList()) {
if (System.currentTimeMillis() >= subscriptionData.getLastConsumeTimestamp() + subscriptionData.getConsumeTimeoutMillis()) {
// 如果Consumer超过一定时间未消费,则重推消息
Set<MessageExt> messageExtSet = this.consumeMessageService.queryMessage(
subscriptionData.getTopic(), subscriptionData.getSubString(), 48, 0
);
for (MessageExt msg : messageExtSet) {
try {
this.brokerController.getBroker2Client().sendMessage(
msg.getTopic(), msg.getTags(), subscriptionData.getConsumerGroup(), msg
);
} catch (Exception e) {
//Ignore
}
}
}
}
} catch (Exception e) {
log.error("ConsumeMessageProcessor, run method exception", e);
}
}
}
Consumer端消息消费重试实现主要在org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl类中:
public void registerMessageListener(MessageListenerConcurrently messageListener) {
executorService.submit(new Runnable() {
@Override
public void run() {
while (!serviceState.isStopped()) {
try {
// 拉取订阅消息
PullResult pullResult = pullMessage(messageListener);
this.processQueue.put(pullResult);
// 消费消息,如果消费失败则重试
DefaultMQPushConsumerImpl.this.consumeMessageService.consumeMessage(pullResult, ConsumeConcurrentlyStatus.RECONSUME_LATER);
} catch (InterruptedException e) {
log.error("InterruptedException occurred during consume message! " + e.getMessage());
} catch (Exception e) {
// 如果是ComsumeConcurrentlyStatus.CONSUME_SUCCESS类型则继续等待下条消息
if (e instanceof MQClientException) {
DefaultMQPushConsumerImpl.this.consumeMessageService.setLastConsumeTimestamp();
DefaultMQPushConsumerImpl.this.consumeMessageService.setConsumeOffset(pullResult.getNextBeginOffset());
}
}
}
}
});
}
RocketMQ主要通过Broker定期对超过一定时间未消费的消息进行重推,以及Consumer消费失败的消息进行重试消费,来实现消息的可靠推送与消费。