ActiveMQ 中的事务处理主要依靠 JMS 事务和 XA 事务两种方式实现。
- JMS 事务:基于 JMS 的本地事务,生产者或消费者与 Broker 之间的事务。
// 开启事务
session.beginTransaction();
// 发送消息
messageProducer.send(message);
// 提交事务
session.commit();
// 回滚事务
session.rollback()
- XA 事务:分布式事务,与外部数据源之间的事务,ActiveMQ 基于 JTA 实现。
生产者:
// 获取 XA 连接工厂
XAConnectionFactory xaConnectionFactory = new ActiveMQXAConnectionFactory(url);
// 获取 XA 连接
XAConnection xaConnection = xaConnectionFactory.createXAConnection();
// 获取 XA 会话
XASession xaSession = xaConnection.createXASession();
// 发送消息
xaSession.getXAManager().getTransaction().commit();
消费者:
// 获取 XA 连接工厂
XAConnectionFactory xaConnectionFactory = new ActiveMQXAConnectionFactory(url);
// 获取 XA 连接
XAConnection xaConnection = xaConnectionFactory.createXAConnection();
// 获取 XA 会话
XASession xaSession = xaConnection.createXASession();
// 消费消息
xaSession.getXAResources().isSameRM(resource);
XA 功能主要用于:
- 与外部数据源(数据库)同时消费和产生数据,要么全部成功,要么全部失败。
- 多个消息中间件(ActiveMQ Server)之间的分布式事务。