ActiveMQ 中消息过期和死信的处理方式主要有:
- 消息过期:通过设置消息的时间戳和过期时间,Broker 会定期检查队列中消息的过期时间,过期的消息会被销毁。
- 死信队列:将过期消息或消费失败的消息转发到指定的死信队列中待重新消费。
例如:
消息过期配置:
// 生产者设置消息过期时间
TextMessage message = session.createTextMessage("Hello");
message.setJMSExpiration(5000); // 5秒过期
producer.send(message);
<!-- Broker 检查过期消息时间,默认为帐户周期的10倍 -->
<broker>
<schedulerDirectoryScanPeriod>100000</schedulerDirectoryScanPeriod>
</broker>
死信队列配置:
<!-- 将 test-queue 中过期消息转发到 dead-letter-queue -->
<broker>
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue="test-queue" >
<deadLetterStrategy>
<queuePrefix>dead-letter-queue</queuePrefix>
</deadLetterStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
</broker>
// 消费者无法确认消息,超过重发次数后转发到死信队列
MessageConsumer consumer = session.createConsumer(queue);
while (true) {
Message message = consumer.receive(5000);
if (message != null) {
// 不确认消息
}
}
消息过期示例:
// 生产者
TextMessage message = session.createTextMessage("Hello");
message.setJMSExpiration(5000);
producer.send(message);
// 5秒后消息过期被销毁
死信队列示例:
// 生产者
producer.send(message);
// 消费者
MessageConsumer consumer = session.createConsumer(queue);
while (true) {
Message message = consumer.receive(5000);
if (message != null) {
// 不确认消息
}
}
// 重发超过限制,消息转发到死信队列
MessageConsumer deadConsumer = session.createConsumer(deadQueue);
Message deadMessage = deadConsumer.receive();
ActiveMQ 通过上述机制可以很好的处理过期消息和死信,实现消息的可靠传输。