ActiveMQ 中可以通过以下方式设置消息的最大等待时间:
- Broker 端设置全局消息超时时间:
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost"
dataDirectory="${activemq.data}">
<destinationPolicy>
<policyEntry queue=">">
<pendingMessageLimitStrategy>
<timeBeforeDispatchStartsInPendingMessagesStrategy
startAfterInactivity="60000" />
</pendingMessageLimitStrategy>
</policyEntry>
</destinationPolicy>
</broker>
此设置将限制 Broker 中所有队列的消息在被分发(发送)给消费者之前最大等待时间为 60 秒。如果超过此时间消息还未被消费,将视为超时失败。
- Broker 端设置指定队列消息超时时间:
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost"
dataDirectory="${activemq.data}">
<destinationPolicy>
<policyEntry queue="queue1">
<pendingMessageLimitStrategy>
<timeBeforeDispatchStartsInPendingMessagesStrategy
startAfterInactivity="30000" />
</pendingMessageLimitStrategy>
</policyEntry>
</destinationPolicy>
</broker>
此设置将限制 queue1 这个队列的消息在被分发给消费者之前最大等待时间为 30 秒。
- 生产者端限制发送消息的最大等待时间:
ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://broker:61616");
Connection conn = cf.createConnection();
conn.start();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = sess.createProducer(sess.createQueue("queue1"));
producer.setSendTimeout(10000); // 10秒超时
producer.setSendBufferSize(500); // 最多500条pending消息
producer.setUseAsyncSend(true); // 异步发送
for (int i=0; i<1000; i++) {
producer.send(sess.createTextMessage(i+""));
}
设置 producer 的 sendTimeout 即可限制其发送的消息如果在 10 秒内未被消费,将超时失败。