RocketMQ消息推送性能优化主要从以下几个方面进行:
- 生产者批量发送:生产者可以将多条消息批量发送到Broker,减少网络交互次数,提高发送性能。
- 合理设置线程数:生产者和消费者可以设置多个发送线程和消费线程,提高并发度,增强消息处理能力。但线程数过多也会影响性能,需要根据机器配置适当设置。
- 压缩消息体:生产者可以启用消息体压缩,减小消息大小,加快网络传输速度。
- 选择高效协议:生产者和消费者可以选择高效的UDP协议进行消息发送和消费,提高网络IO性能。
- 启用无SYNC FLUSH:生产者可以关闭消息同步刷盘,采用异步刷盘方式,减少磁盘IO,提高发送性能。但可能造成消息丢失,需要权衡。
- 合理设置BatchSize:生产者可以适当增大BatchSize,将更多消息批量发送,减少网络 round trip,提高发送性能。但BatchSize太大会增加消息延迟。
- 过滤不必要Tag:消费者可以过滤不需要消费的Tag,减少不必要的消息流转和处理,提高消费性能。
代码示例:
// 生产者批量发送消息
List<Message> msgList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
msgList.add(new Message("TopicTest", "TagA", "KEY" + i, ("Hello RocketMQ " + i).getBytes()));
}
SendResult sendResult = producer.send(msgList);
// 消费者和生产者设置多个线程
producer.setSendMsgTimeout(1000 * 30);
producer.setRetryTimesWhenSendFailed(0);
producer.setRetryAnotherBrokerWhenNotStoreOK(false);
producer.setSendLatencyFaultEnable(false);
producer.setCompressMsgBodyOverHowmuch(1024 * 4); // 4K
producer.setCompressLevel(5); //
producer.setOnsChannel("udp"); //使用UDP协议
producer.setBatchSize(20); //批量发送
pullConsumer.setConsumeThreadMin(5);
pullConsumer.setConsumeThreadMax(20);
pullConsumer.setPullInterval(0);
pullConsumer.setBatchSize(20); //批量消费
pullConsumer.setAutoCommit(true);
RocketMQ通过批量发送、设置线程数、压缩消息、选择高效协议、关闭SYNC FLUSH、设置大BatchSize和过滤Tag等方式优化消息推送性能。