在RocketMQ中使用了模板方法模式定义框架来处理消息。
具体来说:
RocketMQ消息处理流程
RocketMQ在接收到消息后,会执行下面的流程:
- 检查消息格式是否正确
- 进行消息空间检测(没有超过限制)
- 将消息存入内存队列
- 根据配置进行持久化
- 根据路由规则发送消息
- 触发消费者消费消息
- 记录消费情况等
模板方法
RocketMQ定义了一个模板方法用来处理消息:
public abstract class MessageTemplate {
public final void processMessage(Message msg) {
validate(msg); // 对消息格式校验
spaceCheck(msg);
putInQueue(msg);
persist(msg);
route(msg);
triggerConsumer(msg);
recordStats(msg);
}
// 具体方法
protected abstract void validate(Message msg);
protected abstract void spaceCheck(Message msg);
// ...
}
这里定义了消息处理的整个流程,但具体逻辑留给子类实现。
子类实现
有不同的实现类:
public class DefaultMessageTemplate extends MessageTemplate{
protected void validate(Message msg) {
// 默认实现
}
protected void spaceCheck(msg) {
// 检查空间大小
}
// ...
}
public class RTMessageTemplate extends MessageTemplate {
// 实现实时消息的校验和检查逻辑
}
使用
MessageTemplate template = new DefaultMessageTemplate();
template.processMessage(msg);
这样就使用了默认处理消息的框架。
作用
RocketMQ通过模板方法模式定义消息处理模板:
- 提供了一个算法的框架
- 子类只需要实现具体步骤
- 提供了相关的钩子方法
- 算法的结构不变
- 简化了子类的实现
符合模板方法模式原理:定义一个操作中的算法骨架,由子类实现具体步骤。
总的来说,RocketMQ通过模板方法定义消息处理的抽象框架。可以方便地扩展具体实现。