ActiveMQ中如何实现消息的压缩和解压缩?

ActiveMQ 中可以通过以下方式实现消息的压缩和解压缩:
1、 消息压缩:

  • 设置消息类型为 BytesMessage,向其中写入压缩后的字节数据:
BytesMessage message = session.createBytesMessage();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
GZIPOutputStream gzip = new GZIPOutputStream(bos);
gzip.write("Hello".getBytes());
gzip.finish();
gzip.close();

message.writeBytes(bos.toByteArray());  //写入压缩数据
producer.send(message); 
  • 发送 TextMessage 消息时开启 Gzip 压缩:
<broker>
  <transportConnectors>
    <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?compression=gzip"/>
  </transportConnectors>  
</broker> 

此设置会启用消息的 Gzip 压缩。

2、 消息解压缩:

  • 消费 BytesMessage 消息时读取字节数据并解压:
BytesMessage message = (BytesMessage)consumer.receive();
byte[] data = new byte[(int) message.getBodyLength()];
message.readBytes(data);   

GZIPInputStream gzip = new GZIPInputStream(new ByteArrayInputStream(data)); 
BufferedReader reader = new BufferedReader(new InputStreamReader(gzip));
String line;
while ((line = reader.readLine()) != null) {
    // 处理解压后数据
}
  • 消费端开启 auto-unpack,自动解压收到的 Gzip 压缩消息:
<broker>
  <transportConnectors>
    <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?auto-unpack=true"/>
  </transportConnectors>
</broker>