ActiveMQ 中可以通过以下方式实现消息的分组和顺序消费:
- 发送消息时设置 JMSXGroupID 属性分组:
TextMessage message = session.createTextMessage("Hello");
message.setStringProperty("JMSXGroupID", "Group1"); // 消息分组
producer.send(message);
- 消费消息时设置相同的消息选择器选择分组消息:
MessageConsumer consumer = session.createConsumer(queue, "JMSXGroupID = 'Group1'");
- 消费者关闭后,同组的其他消息会被新的消费者消费:实现负载均衡效果。
- 消费端设置 JMSXGroupSeq 属性实现顺序消费:
// 消费端1
MessageConsumer consumer1 = session.createConsumer(queue, "JMSXGroupID = 'Group1'");
// 消费端2
MessageConsumer consumer2 = session.createConsumer(queue,
"JMSXGroupID = 'Group1' AND JMSXGroupSeq > 1");
consumer2 的消息选择器选择了大于 1 的 JMSXGroupSeq 的消息,实现了顺序消费的效果。
- 消息生产者也可以设置 JMSXGroupSeq 属性发送顺序消息:
TextMessage message1 = session.createTextMessage("Hello");
message1.setStringProperty("JMSXGroupID", "Group1");
message1.setIntProperty("JMSXGroupSeq", 1); // 序列号1
TextMessage message2 = session.createTextMessage("Hello");
message2.setStringProperty("JMSXGroupID", "Group1");
message2.setIntProperty("JMSXGroupSeq", 2); // 序列号2
producer.send(message1);
producer.send(message2);