RocketMQ主要通过幂等生产者与消息去重来保证消息的幂等性。
- 幂等生产者:生产者设置producer.setRetryTimesWhenSendFailed(Integer.MAX_VALUE);以及producer.setRetryAnotherBrokerWhenNotStoreOK(true),这样在消息发送失败时,生产者会无限重试向其他Broker发送消息,保证消息最终到达Broker。
public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
producer.setRetryTimesWhenSendFailed(Integer.MAX_VALUE);
producer.setRetryAnotherBrokerWhenNotStoreOK(true);
// 发送消息
return producer.send(msg);
}
- 消息去重:Broker在接收到消息后,会先判断是否是重复消息。重复消息会直接返回成功响应,并丢弃消息,保证不进行重复消费。
RocketMQ主要通过两级去重来判断消息是否重复:
- 时间戳去重:如果接收消息的时间戳小于等于Broker已有消息的最大时间戳,则判断为重复消息。
- 唯一ID去重:对每个Topic,Broker会维护一个长整形变量记载已消费消息的最大offset,如果接收消息的offset小于等于此变量,则判断为重复消息。
去重相关代码:
- org.apache.rocketmq.broker.processor.SendMessageProcessor#isFilterMessage:实现时间戳去重判断。
- org.apache.rocketmq.broker.processor.SendMessageProcessor#isDuplicateMessage:实现唯一ID去重判断。