RabbitMQ 可以通过消息确认与补偿机制实现消息的重发。
工作原理是:
- 生产者在发送消息时开启确认机制 channel.confirmSelect()。
- 如果生产者在指定时间内未收到 RabbitMQ 的确认信息,即消息可能未正确投递,这时需要重发消息。
- 在消费端,如果消费者在处理消息时发生异常,可以将消息 nack 或 reject,RabbitMQ 会将消息重新入队,实现重发。
- 消息重发后可以被新的消费者消费,或者被投递到死信交换机进行异常处理。
- 通过消息确认与 nack 机制,在消息可能丢失时实现消息重投递的效果。
示例代码:
生产者:
// 开启确认模式
channel.confirmSelect();
channel.basicPublish(exchange, routingKey, null, message.getBytes());
// 如果长时间未收到确认,需要重发消息
if (!channel.waitForConfirms(5 * 1000)) {
channel.basicRecover();
channel.basicPublish(exchange, routingKey, null, message.getBytes());
}
消费者:
// 消费消息
channel.basicConsume(queue, false, "consumer");
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
try {
// 处理消息
} catch (Exception e) {
// 处理消息异常,nack消息
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
}
- 生产者开启确认模式,如果长时间未收到确认,调用 channel.basicRecover() 重发消息。
- 消费者消费消息时发生异常,调用 channel.basicNack() 方法 nack 消息。
- 被 nack 的消息会被 RabbitMQ 重新入队,等待新的消费者消费,实现消息重发。
所以总结来说,RabbitMQ 通过消息确认与 nack 机制实现了消息重发的效果。这需要我们在生产者定义时开启确认模式,并在未收到确认时调用 channel.basicRecover() 方法重发消息。在消费者中,出现异常时正确调用 channel.basicNack() 方法 nack 消息。