ActiveMQ 中可以通过以下方式设置消息的最大并发数:
- Broker 端设置全局最大并发数:
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost"
dataDirectory="${activemq.data}">
<destinationPolicy>
<policyEntry queue=">">
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
</destinationPolicy>
</broker>
此设置将限制 Broker 中所有队列的 pending 消息数量最大为 1000。也就是消息生产者最多只能有 1000 条消息等待消费。
- Broker 端设置指定队列最大并发数:
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost"
dataDirectory="${activemq.data}">
<destinationPolicy>
<policyEntry queue="queue1">
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="500"/>
</pendingMessageLimitStrategy>
</policyEntry>
</destinationPolicy>
</broker>
此设置将限制 queue1 这个队列的 pending 消息数量最大为 500。
- 生产者端限制发送消息的最大并发数:
ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://broker:61616");
Connection conn = cf.createConnection();
conn.start();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = sess.createProducer(sess.createQueue("queue1"));
producer.setSendTimeout(0); // 设置立即超时
int maxPending = 500; // 最多500条pending消息
producer.setSendBufferSize(maxPending);
producer.setUseAsyncSend(true); // 异步发送
for (int i=0; i<1000; i++) { // 发送1000条消息
producer.send(sess.createTextMessage(i+""));
}
设置 producer 的 sendBufferSize 即可限制其最多只能有 500 条 pending 消息。超过此数量的发送请求将超时失败。