ActiveMQ 中可以通过以下方式实现消息的去重和幂等性:
1、 消息去重:
- 在消息中设置 JMSMessageID,Broker 会对相同 JMSMessageID 的消息进行去重。
- 生产者设置消息的唯一 ID,并在发送前检查 Broker 中是否已存在此 ID 的消息,如果存在则不重复发送。
- 消息消费者在消费消息后,将消息 ID 记录数据库,消费下一个消息前查询此 ID 是否消费过,如果消费过则丢弃以去重。
- 示例代码:
ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://broker:61616");
Connection conn = cf.createConnection();
conn.start();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = sess.createProducer(sess.createQueue("queue1"));
// 设置唯一ID
String msgID = UUID.randomUUID().toString();
// 检查是否已发送过此ID的消息
if (!checkDupMsg(msgID)) {
TextMessage msg = sess.createTextMessage("Hello");
msg.setStringProperty("JMS_ID", msgID);
producer.send(msg);
}
2、 消息幂等性:
- 利用数据库事务和消息会话事务实现幂等产生消息或消费消息的操作。要么都成功,要么都失败。
- 重复发送消息时,消息内容带有唯一ID和版本号,消费者只消费版本号更高的消息。
- 消息消费后,消费者记录消费状态至数据库。重复消费时检查数据库消费状态,丢弃已经消费过的消息。
- 示例代码:
Connection con = ds.getConnection(); // 数据库连接
Session session = con.createSession(true, Session.SESSION_TRANSACTED); // 事务会话
MessageProducer producer = sess.createProducer(sess.createQueue("queue"));
con.setAutoCommit(false); // 开启事务
// 数据库操作
session.send(msg); // 发送消息
con.commit(); // 提交事务,两者成功或失败