RocketMQ通过本地事务和消息事务两阶段来支持分布式事务。
- 本地事务:生产者在发送事务消息前,需要首先执行本地事务(本地数据库事务)。只有本地事务执行成功,生产者才会向Broker发送事务消息。
- 消息事务:
- 发送阶段:生产者发送事务消息到Broker,Broker会返回事务ID并暂存消息。
- 执行阶段:生产者向Broker发送提交/回滚请求,Broker根据请求来提交或回滚消息。
- 回查阶段:Broker定期回查生产者的本地事务状态。如果回滚,Broker也会回滚消息。
这两个阶段配合来保证消息与本地事务的强一致性。RocketMQ的事务消息实现了最大努力通知,可以保证至少一次或最多一次的消息投递语义。
代码示例:
// 本地事务执行
public boolean executeLocalTransaction(Message msg, Object arg) {
if (executeSuccess) {
return LocalTransactionState.UNKNOW;
} else {
return LocalTransactionState.ROLLBACK;
}
}
// 执行阶段提交/回滚
public TransactionListenerImpl() implements TransactionListener {
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
}
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
}
}
producer.setTransactionListener(transactionListener);
// 发送事务消息
Message msg = new Message("TopicTest", "TagA", "KEY", "Hello".getBytes());
msg.setTransactionId("TX_001"); // 设置事务ID
producer.sendMessageInTransaction(msg, null);
// 提交/回滚事务
producer.commitMessage(msg.getTransactionId());