ActiveMQ 中可以通过以下方式实现消息的回滚和重发:
- 使用事务的方式接收消息,若消费失败则回滚事务实现消息回滚:
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer consumer = session.createConsumer(queue);
Message message = consumer.receive();
try {
// 消息消费逻辑
session.commit(); // 消费成功,提交事务
} catch (Exception e) {
session.rollback(); // 消费失败,回滚事务,消息会被重新消费
}
- 消费端设置 CLIENT_ACKNOWLEDGE 确认模式,消费完成后手动确认消息,若确认失败消息会被重发:
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queue);
Message message = consumer.receive();
try {
// 消息消费逻辑
message.acknowledge(); // 消费成功,确认消息
} catch (Exception e) {
// 消费失败,消息会被重发
}
- 消费者设置消息选择器进行过滤消费,将未消费的消息留给其他消费者消费:
MessageConsumer consumer1 = session.createConsumer(queue, "color = 'red'");
MessageConsumer consumer2 = session.createConsumer(queue, "color = 'blue'");
consumer1 只消费 color 为 red 的消息,blue 消息会被 consumer2 消费,实现了消息的重发。
- 手动设置消息重发次数和重发间隔:
<policyEntry topic=">">
<pendingDurableSubscriberPolicy>
<redeliveryDelay>5000</redeliveryDelay> <!-- 重发间隔5s -->
<maximumRedeliveries>3</maximumRedeliveries> <!-- 最大重发3次 -->
</pendingDurableSubscriberPolicy>
</policyEntry>