RocketMQ提供了两种消息延时发送机制:
- 延时消息:在发送消息时,通过设置delayTimeLevel参数指定延时级别,消息将在指定时间后变为可消费状态。
- 生产者设置延时级别:DefaultMQProducer#send(Message msg, long delayTimeLevel)
- Broker在延时时间到后将消息状态变为可消费,并推送至消费者。
- 定时消息:生产者发送消息时不指定延时级别,而是在Broker设置定时规则,消息在规则触发时变为可消费状态并推送至消费者。
- 生产者 sends 普通消息:DefaultMQProducer#send(Message msg)
- 运维人员通过控制台或代码设置Broker定时规则:
sh mqadmin updateTopic -n localhost:9876 -t TopicTest -r "30s,0 0/3 * * * ? *"
此规则表示TopicTest的消息在30秒后变为可消费,并每隔3分钟推送一次。
延时消息与定时消息在Broker的处理流程上略有不同:
- 延时消息:Broker在接收消息后会将其持久化,并在DelayTimeQueue中设置延时时间。时间到后将消息状态变更为可消费并推送。
- 定时消息:Broker在接收消息后会直接持久化并标记为不可消费。定时触发线程会扫描消息并更改为可消费状态并推送。
相关代码:
- 延时消息
- org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage:接收延时消息并持久化。
- org.apache.rocketmq.broker.delay.DelayTimeQueue:实现延时消息队列与推送。
- 定时消息
- org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage:接收消息并持久化。
- org.apache.rocketmq.broker.schedule.ScheduleMessageService:实现定时消息扫描与推送。