ActiveMQ 中如何处理消息的过期和死信?

ActiveMQ 中消息过期和死信的处理方式主要有:

  1. 消息过期:通过设置消息的时间戳和过期时间,Broker 会定期检查队列中消息的过期时间,过期的消息会被销毁。
  2. 死信队列:将过期消息或消费失败的消息转发到指定的死信队列中待重新消费。

例如:
消息过期配置:

// 生产者设置消息过期时间
TextMessage message = session.createTextMessage("Hello");
message.setJMSExpiration(5000);  // 5秒过期
producer.send(message);
<!-- Broker 检查过期消息时间,默认为帐户周期的10倍 --> 
<broker>
   <schedulerDirectoryScanPeriod>100000</schedulerDirectoryScanPeriod> 
</broker>

死信队列配置:

<!-- 将 test-queue 中过期消息转发到 dead-letter-queue -->
<broker>
   <destinationPolicy>
       <policyMap>
           <policyEntries>
                <policyEntry queue="test-queue" >                      
                     <deadLetterStrategy>
                          <queuePrefix>dead-letter-queue</queuePrefix>
                     </deadLetterStrategy>              
                </policyEntry> 
           </policyEntries>
       </policyMap>
   </destinationPolicy>
</broker>
// 消费者无法确认消息,超过重发次数后转发到死信队列
MessageConsumer consumer = session.createConsumer(queue);
while (true) {
    Message message = consumer.receive(5000);
    if (message != null) {
        // 不确认消息
    } 
}

消息过期示例:

// 生产者
TextMessage message = session.createTextMessage("Hello");
message.setJMSExpiration(5000);  
producer.send(message);

// 5秒后消息过期被销毁

死信队列示例:

// 生产者  
producer.send(message);

// 消费者
MessageConsumer consumer = session.createConsumer(queue);
while (true) {
    Message message = consumer.receive(5000);
    if (message != null) {
        // 不确认消息
    } 
} 

// 重发超过限制,消息转发到死信队列
MessageConsumer deadConsumer = session.createConsumer(deadQueue);
Message deadMessage = deadConsumer.receive();

ActiveMQ 通过上述机制可以很好的处理过期消息和死信,实现消息的可靠传输。