在ActiveMQ中使用了责任链模式来处理消息。
具体来说:
ActiveMQ支持拦截器
ActiveMQ允许开发者定义消息拦截器(Interceptor),对消息进行处理:
public class SampleInterceptor implements MessageInterceptor {
public SampleInterceptor next;
public void intercept(Message message){
// 处理消息
if(next != null){
next.intercept(message);
}
}
}
每个拦截器可以有下一个拦截器。
拦截器链接成链
ActiveMQ会将多个拦截器串连成一个链:
SampleInterceptor i1 = new SampleInterceptor();
CompressionInterceptor i2 = new CompressionInterceptor();
i1.next = i2;
i2.next = null; // 最后一个
broker.addInterceptor(i1);
这就形成了一个拦截器链。
处理消息
当发布消息时,会依次调用拦截器链上的拦截器:
producer.send(message);
// 首先调用 i1
i1.intercept(message);
// 再调用 i2
i2.intercept(message);
责任链模式
由于拦截器链的存在,ActiveMQ使用了责任链模式来处理消息:
- Handler: 消息拦截器
- Client: 发布消息者
- Next:下一个拦截器
- 拦截器链正好模拟了责任链。
优点是:
- 解耦不同拦截器
- 方便添加/删除拦截器
- 未处理的请求传给下一个
ActiveMQ通过责任链,对消息进行多次处理。