ActiveMQ 中可以通过以下几种方式避免消息丢失:
- 使用持久化消息:将消息持久化到存储介质,防止服务重启消息丢失。
// 发送持久化消息
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
producer.send(message);
- 开启消息 Broker 的持久化:将消息持久化到 KahaDB、JDBC 等存储方式。
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
- 设置消息过期时间:防止消息长期未消费堆积。
message.setJMSExpiration(3600000); // 1小时过期
producer.send(message);
- 设置消息存储上限:防止消息 consume 跟不上 produce 导致 OOM。
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage percentOfJvmHeap="70" />
</memoryUsage>
</systemUsage>
</systemUsage>
- 设置队列或主题的消息存储上限:
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" >
<pendingQueuePolicy>
<maxSize>10000</maxSize>
</pendingQueuePolicy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
- 消费端做重试机制:消费失败时进行重试,防止消息漏消费。