ActiveMQ 中可以通过以下方式实现消息的压缩和解压缩:
1、 消息压缩:
- 设置消息类型为 BytesMessage,向其中写入压缩后的字节数据:
BytesMessage message = session.createBytesMessage();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
GZIPOutputStream gzip = new GZIPOutputStream(bos);
gzip.write("Hello".getBytes());
gzip.finish();
gzip.close();
message.writeBytes(bos.toByteArray()); //写入压缩数据
producer.send(message);
- 发送 TextMessage 消息时开启 Gzip 压缩:
<broker>
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?compression=gzip"/>
</transportConnectors>
</broker>
此设置会启用消息的 Gzip 压缩。
2、 消息解压缩:
- 消费 BytesMessage 消息时读取字节数据并解压:
BytesMessage message = (BytesMessage)consumer.receive();
byte[] data = new byte[(int) message.getBodyLength()];
message.readBytes(data);
GZIPInputStream gzip = new GZIPInputStream(new ByteArrayInputStream(data));
BufferedReader reader = new BufferedReader(new InputStreamReader(gzip));
String line;
while ((line = reader.readLine()) != null) {
// 处理解压后数据
}
- 消费端开启 auto-unpack,自动解压收到的 Gzip 压缩消息:
<broker>
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?auto-unpack=true"/>
</transportConnectors>
</broker>