RocketMQ通过以下几个机制来保证消息的可靠性传递:
- 消息持久化:将消息写入Commit Log和Consume Queue,以防止消息丢失。
- 主备Broker:Broker支持主备模式,一个Master Broker和多个Slave Broker,Master宕机会自动切换到Slave,保证服务可用。
- 同步双写:MasterBroker将消息写到本地和Slave Broker,Slave Broker写成功后响应给Master Broker,Master Broker再响应给Producer,保证消息同步写入主备Broker。
- 异步复制:Master Broker将消息写到本地后立即响应给Producer,然后异步复制到Slave Broker,以提高消息发送性能,但可能丢失部分消息。
- 最小数据副本数:可配置主备Broker的最小数据副本数,如设置为2,则至少要有一主一备,否则Producer无法发送消息。这保证了即使一个Broker宕机,消息仍然可以正常读写。
- 消息重试机制:当Broker响应发送失败时,Producer会在一定时间后重试发送消息,直到成功或者超过最大重试次数。
- consumer端记录位点:Consumer在消费消息时会实时记录消费位点,消费Crash后从最后记录的位点继续消费,保证不重复消费消息。
举例:
- 同步双写:
// 同步发送消息
SendResult sendResult = producer.send(msg);
// 响应逻辑
if (sendResult != null) {
// Master将消息写入本地和Slave成功,才会返回响应
handleSendSuccess();
} else {
handleSendFail();
}
- 消息重试:
producer.setRetryTimesWhenSendFailed(3); // 发送失败重试3次
for (int i = 0; i < 3; i++) { // 最多重试3次
try {
SendResult sendResult = producer.send(msg);
handleSendSuccess(sendResult);
} catch (Exception e) {
handleSendFail(e);
}
}
RocketMQ通过上述机制保证了消息的高可靠性发送与消费,理解这些机制的原理和实现,可以让我们在使用RocketMQ时更加妥善地处理消息,构建高可用的分布式应用系统。