ActiveMQ 是一个开源的消息代理和消息队列服务器。主要用于实现应用程序之间的异步通信和解耦。
ActiveMQ的主要特点有:
- 多语言客户端:支持Java、C、C++、C#、Ruby、Perl、Python、PHP等多种语言。
- 多种传输协议:支持TCP、SSL、STOMP、WS-Security、REST等多种协议。
- 多种消息传递模式:支持发布-订阅、点到点、请求-响应等模式。
- 基于JMS标准:支持JMS 1.1和J2EE 1.4规范。
- 高性能:能达到万级别的消息传输。
- 高可用:可以配置为Master-Slave高可用集群。
- 分布式:可以建立分布式消息网络。
- 易于管理:提供Web控制台用于消息管理和系统配置。
例如:
ActiveMQ 可以用于以下场景:
- 系统解耦:通过消息队列进行系统解耦,提高系统的灵活性和扩展性。
- 广播通知:通过发布-订阅模式实现广播通知功能。
- 应用集成:通过消息队列对不同语言和平台的应用程序进行集成。
- 削峰填谷:使用消息队列可以削峰填谷,解决应用间处理能力不匹配的问题。
- 数据同步:通过批量同步的方式达到数据同步的目的,实现应用间的数据共享。
一个简单的例子:
// 生产者
public class Producer {
public static void main(String[] args) throws JMSException {
// 创建连接工厂
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
// 设置连接参数,连接到ActiveMQ服务
connectionFactory.setBrokerURL("tcp://localhost:61616");
// 创建连接
Connection connection = connectionFactory.createConnection();
// 启动连接
connection.start();
// 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目标(topic或queue)
Destination destination = session.createQueue("my-queue");
// 创建生产者
MessageProducer producer = session.createProducer(destination);
// 创建并发送消息
for (int i = 0; i < 10; i++) {
TextMessage message = session.createTextMessage("Message " + i);
producer.send(message);
}
// 关闭连接
connection.close();
}
}
// 消费者
public class Consumer {
public static void main(String[] args) throws JMSException {
// 创建连接工厂
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
// 设置连接参数,连接到ActiveMQ服务
connectionFactory.setBrokerURL("tcp://localhost:61616");
// 创建连接
Connection connection = connectionFactory.createConnection();
// 启动连接
connection.start();
// 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目标(topic或queue)
Destination destination = session.createQueue("my-queue");
// 创建消费者
MessageConsumer consumer = session.createConsumer(destination);
// 监听消息
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
System.out.println(((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
// 等待键盘输入
System.in.read();
// 关闭连接
connection.close();
}
}