RocketMQ通过以下机制来保证消息的顺序性传递:
- 同一个Producer Group中的Producer,只有一个Producer可以发送消息,这样可以保证消息在发送时的顺序性。
- 消息在Commit Log中按发送顺序排列,Consumer消费时也是按Commit Log中的顺序消费,这样可以保证消息消费的顺序性。
- 对于异步复制的消息,Slave Broker在接到消息后,如果发现Commit Log中已有同一个Producer Group的消息,则会等待该消息被消费后才继续复制新的消息。这样也能保证消息消费的顺序性。
- 如果消费过程中Consumer宕机,在重启后会从上次消费到的位置继续消费。RocketMQ会找出该Consumer Group上次消费的位置,然后继续顺序消费。
- 消费者通过设置consumeMessageService的属性consumeFromWhere来确定消息消费的起始位置。有三个可选值:
- CONSUME_FROM_LAST_OFFSET:从上次消费的位置继续消费
- CONSUME_FROM_FIRST_OFFSET:从队列最开始的位置消费
- CONSUME_FROM_TIMESTAMP:从指定时间点开始消费 所以,RocketMQ主要是依靠Producer Group、Commit Log的顺序性以及Consumer的消费策略来保证消息的顺序传递。
举例:
- 同一Producer Group下的Producer发送顺序消息:
// 发送顺序消息
Message msg1 = new Message("TopicTest", "TagA", "OrderID001", "Hello".getBytes());
Message msg2 = new Message("TopicTest", "TagA", "OrderID002", "World".getBytes());
producer.send(msg1);
producer.send(msg2);
- 设置ConsumeFromWhere为CONSUME_FROM_LAST_OFFSET消费顺序消息:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup1");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.subscribe("TopicTest", "TagA");
consumer.registerMessageListener(new MessageListenerConcurrently() {
// 消息会按发送顺序消费
});
consumer.start();