ActiveMQ 中的延迟投递主要有以下几种方式实现:
- 使用定时器:生产者发送消息后,启动一个定时器,定时器触发时将消息发送到Broker。
// 发送消息
producer.send(message);
// 启动定时器
Timer timer = new Timer();
timer.schedule(new TimerTask() {
public void run() {
// 延迟时间到,发送消息到Broker
producer.send(message);
}
}, delayTime);
- 使用 Scheduler:将消息发送到 Broker 的 schedule:// 代理地址,Broker根据调度策略投递消息。
producer.send(queue, message, DeliveryMode.NON_PERSISTENT,
message.getJMSPriority(), (long) delayTime);
- 在消息头中设置 JMSExpiration 属性:Broker 检查每个消息的过期时间,过期时将其投递。
message.setJMSExpiration(System.currentTimeMillis() + delayTime);
producer.send(queue, message);
- 使用 intermediary queue:生产者发送到临时队列,启动Listener监听该队列,延迟时间到时从临时队列取出消息发送到目标队列。
监听器:
Queue tempQueue = session.createTemporaryQueue();
MessageConsumer consumer = session.createConsumer(tempQueue);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
try {
// 延迟时间到,发送消息到目标队列
producer.send(queue, message);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
生产者:
producer.send(tempQueue, message);