ActiveMQ 中可以通过以下几种方式避免消息重复消费:
- 使用事务消费消息:消费者事务提交前,消息会被锁定。
// 开启事务消费
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer consumer = session.createConsumer(queue);
// 消息消费
Message message = consumer.receive();
session.commit(); // 提交事务,消息被真正消费
- 使用签收机制:消费者签收消息后,消息才会被真正消费。
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queue);
// 消息消费
Message message = consumer.receive();
message.acknowledge(); // 签收消息
- 持久化消费行定位:记录消费位置,重启从上次位置继续消费。
<consumerPersistence>
<fileCursorPersistence usage="Consumer" />
</consumerPersistence>
- 幂等性消费:消费端做幂等检查,重复消息只消费一次。
while (true) {
Message message = consumer.receive();
if (processedMessages.contains(message.getMessageID())) {
continue; // 重复消息,跳过
}
// 消息消费逻辑
processedMessages.add(message.getMessageID());
}
- 消费端做重试机制:消费失败时进行重试,防止消息漏消费。