切换主题
04. 消息模型与命名:别让消费者互相抢消息
第三章我们终于把下单主流程和旁路动作拆开了。
下单接口现在只做两件事:保存订单,发布 OrderCreated 事件。短信、积分、仓库这些动作,由消费者异步处理。
看起来已经清爽多了,对吧?
但是业务一涨,新的问题马上出来。
通知服务说:“我怎么有时候收不到订单消息?”
积分服务说:“我和通知服务是不是用了同一个 consumerGroup?”
仓库服务说:“我只想要 OrderPaid,为什么 OrderCreated 也过来了?”
线上排查时,客服丢给你一个订单号:“老板问这笔订单的消息到底发没发?”你打开日志,发现只有 RocketMQ 的 messageId,没有业务 key。那一刻你就会知道:消息发出去只是第一步,消息模型设计不好,后面全是账。
所以第四章先不继续堆消费者代码。我们先把 topic、tag、key、eventId、consumerGroup 讲清楚。
这一章的目标是:你以后看到一个订单消息,就能立刻判断它属于哪类消息、是什么事件、怎么排查、哪些服务应该收到、哪些实例只是分摊处理。
上一章练习参考答案
先复盘第三章留下的问题。
- 第三章里,
downstream=PUBLISHED为什么不能写成downstream=SUCCESS?
因为异步以后,主流程成功和下游成功不是同一件事。
PUBLISHED 只表示订单服务已经把 OrderCreated 事件交出去了。它不代表短信已经发了,不代表积分已经加了,也不代表仓库已经处理完。
如果你把它写成 SUCCESS,调用方很容易误解成“所有旁路动作都成功”。以后出了问题,前端、客服、运营都会被这个字段误导。
更稳的表达是:
text
订单创建状态:SUCCESS
下游投递状态:PUBLISHED
下游处理状态:由消费者自己的日志、重试、死信、补偿来体现- 通知服务、积分服务、仓库服务如果未来拆成三个独立服务,它们应该使用同一个 consumerGroup 吗?为什么?
不应该。
consumerGroup 表示一组消费者共同分摊同一份消费任务。同一个 group 里的多个实例,是“你处理一部分,我处理一部分”。
通知、积分、仓库是三个不同业务方。它们都需要收到订单事件,各自做自己的事。所以它们应该使用三个不同 consumerGroup。
如果它们共用一个 group,就会互相抢消息。结果可能是通知服务消费了这条消息,积分服务就收不到了。
- 如果短信消费者失败后 RocketMQ 重试,怎么避免重复发短信?
要做消费幂等。
比如用 eventId 或者 orderId + action 做幂等记录:
text
orderId=O10001, action=SEND_ORDER_CREATED_SMS消费者开始处理前先查这个记录。如果已经成功处理过,就直接返回消费成功,不再重复发短信。如果没处理过,再发送短信,并在成功后写入处理记录。
注意,MQ 的重试机制不能替你保证“业务只执行一次”。它只负责在失败时再次投递。真正的业务去重,要靠消费者自己设计幂等。
OrderCreatedEvent里为什么要有eventId,只有orderId行不行?
只有 orderId 不够。
orderId 表示业务对象,eventId 表示这一次发生的事件。
同一笔订单可能发生很多事件:OrderCreated、OrderPaid、OrderCancelled、OrderClosed。甚至某些补偿场景下,同一类事件也可能重新生成一次。没有 eventId,你很难区分“同一笔订单的不同事件”和“同一事件的重复投递”。
所以:
text
orderId:这是谁的订单
eventId:这是哪一次事件
messageId:这是 RocketMQ 里哪一条消息三者都重要,但不能混用。
- 如果订单保存成功,但普通消息发送失败,本章方案会留下什么风险?你觉得后面应该用什么机制补上?
风险是:订单已经创建成功,但下游永远不知道这件事。
也就是本地业务状态和消息投递状态不一致。
这个问题不是第四章解决的。后面讲事务消息时,我们会专门处理“本地事务成功和消息发送成功如何保持一致”。在讲事务消息之前,你先记住这个坑:普通消息异步解耦解决的是主链路阻塞问题,不自动解决本地事务和消息投递一致性。
业务又长了一点,问题就变了
第三章只有一个消费者,看起来没什么命名压力。
现在产品继续加需求:
下单创建后,通知服务要发短信和站内信;积分服务要加积分;仓库服务要创建履约任务。支付成功后,通知服务要通知用户,积分服务要确认积分,仓库服务要预占库存,财务服务还要记账。
你要是没有消息模型设计,很容易写成这样:
text
topic: order-topic
tag: created
group: order-group
key: 空刚开始能跑。
但出问题时你会很难受:
通知服务和积分服务用了同一个 group,它们互相抢消息;created 这个 tag 过于随意,别的系统看不出它是订单创建;没有 key,按订单号查消息很麻烦;测试环境和开发环境 group 不隔离,互相影响。
所以这一章要先定规则。
本章伴生工程
第四章模块是:
text
D:\idea_space\rocketmq-order-tutorial
chapter-04-message-model-naming/
pom.xml
src/main/java/com/example/rocketmqdemo/
TutorialMessageModelApplication.java
api/
DeliveryView.java
MessageModelDemoController.java
MessageModelDemoResponse.java
PublishOrderCreatedDemoRequest.java
config/
MessageModelDemoConfiguration.java
model/
ConsumerGroupSubscribers.java
DeliveryRecord.java
LocalMessageConsumer.java
LocalMessageModelBus.java
MessageEnvelope.java
MessageNamingPolicy.java
OrderCreatedEvent.java
RecordingConsumer.java
Subscriber.java
src/main/resources/
application.yml
src/test/java/com/example/rocketmqdemo/
api/MessageModelDemoControllerTest.java
model/LocalMessageModelBusTest.java
model/MessageNamingPolicyTest.java这一章的伴生工程不依赖真实 RocketMQ 容器。
原因很简单:我们要先把模型语义讲清楚。真实 RocketMQ 环境负责投递,本章的本地 bus 负责把“投递语义”演示出来:不同 group 都能收到一份;同一个 group 里的多个实例只分摊一份。
这不是替代 RocketMQ,而是把模型拆开讲。
先把五个概念说人话
topic 是消息的大类。
订单领域里,我们现在用:
text
order-event-topic它表示“订单事件”这个大类。OrderCreated、OrderPaid、OrderCancelled 都可以先放在这个 topic 里,再用 tag 区分。
tag 是 topic 里的事件类型。
本章用:
text
OrderCreatedtag 不要随便写成 created、create、order_create 混着来。建议直接用事件名,保持过去式:OrderCreated、OrderPaid、OrderClosed。
key 是排查用的业务关键字。
订单消息里,最自然的 key 是:
text
O10001也就是 orderId。线上排查时,别人通常给你订单号,不会给你 RocketMQ 的 messageId。
eventId 是事件自己的唯一编号。
它回答的是:这是哪一次业务事件?
consumerGroup 是消费分组。
它回答的是:谁和谁一起分摊同一份消费任务?
同一个服务的多个实例,用同一个 group。不同业务服务,要用不同 group。
命名策略写进代码
不要让每个开发各写一套字符串。
第四章用 MessageNamingPolicy 收口:
java
public class MessageNamingPolicy {
private final String orderEventTopic;
private final String environment;
public MessageEnvelope envelope(OrderCreatedEvent event) {
return new MessageEnvelope(
orderEventTopic,
event.tag(),
event.orderId(),
event.eventId(),
event.orderId(),
event
);
}
public String consumerGroup(String serviceName) {
return environment + "-" + normalize(serviceName) + "-order-events";
}
}测试里验证结果:
text
topic: order-event-topic
tag: OrderCreated
key: O10001
eventId: evt_10001
destination: order-event-topic:OrderCreated
consumerGroup:
dev-notification-service-order-events
dev-points-service-order-events
dev-warehouse-service-order-events这里有两个小心思。
第一,group 里带了环境:dev。
开发、测试、生产不要混用同一个 group。否则测试服务可能抢生产消息,这种事故很难看。
第二,group 里带了服务名。
notification-service 和 points-service 是不同业务方,就应该是不同 group。
本地 bus 演示投递语义
第四章的 LocalMessageModelBus 只做一件事:模拟 consumerGroup 的投递语义。
订阅代码长这样:
java
bus.subscribe("dev-notification-service-order-events", "notification-1", "OrderCreated", notificationA);
bus.subscribe("dev-notification-service-order-events", "notification-2", "OrderCreated", notificationB);
bus.subscribe("dev-points-service-order-events", "points-1", "OrderCreated", points);
bus.subscribe("dev-warehouse-service-order-events", "warehouse-1", "OrderCreated", warehouse);注意,通知服务有两个实例,但它们是同一个 group。
积分服务和仓库服务是另外两个 group。
发布第一条消息:
java
MessageEnvelope first = policy.envelope(newEvent("evt_10001", "O10001"));
List<DeliveryRecord> firstDeliveries = bus.publish(first);测试断言:
java
assertThat(firstDeliveries).extracting(DeliveryRecord::consumerGroup).containsExactlyInAnyOrder(
"dev-notification-service-order-events",
"dev-points-service-order-events",
"dev-warehouse-service-order-events"
);
assertThat(firstDeliveries)
.filteredOn(record -> record.consumerGroup().equals("dev-notification-service-order-events"))
.hasSize(1);这段测试要表达的不是本地 bus 多厉害,而是一个 RocketMQ 使用常识:
同一条消息,对不同 consumerGroup 来说,各有一份消费机会;对同一个 consumerGroup 里的多个实例来说,它们共同分摊这一份消费任务。
如果你把通知服务和积分服务放进同一个 group,它们就会互相抢消息。
HTTP 看一眼效果
启动第四章:
powershell
cd D:\idea_space\rocketmq-order-tutorial
mvn.cmd -pl chapter-04-message-model-naming spring-boot:run调用:
powershell
Invoke-RestMethod `
-Uri 'http://127.0.0.1:18084/api/message-model/order-created' `
-Method Post `
-ContentType 'application/json; charset=utf-8' `
-Body '{"orderId":"O10001","userId":"U10001","payAmount":19900}'真实响应:
json
{
"topic": "order-event-topic",
"tag": "OrderCreated",
"key": "O10001",
"eventId": "evt_efa39dcd-5c68-491e-8565-0c0e4a320731",
"destination": "order-event-topic:OrderCreated",
"deliveries": [
{
"consumerGroup": "dev-notification-service-order-events",
"instanceName": "notification-1",
"tag": "OrderCreated",
"key": "O10001",
"eventId": "evt_efa39dcd-5c68-491e-8565-0c0e4a320731",
"orderId": "O10001"
},
{
"consumerGroup": "dev-points-service-order-events",
"instanceName": "points-1",
"tag": "OrderCreated",
"key": "O10001",
"eventId": "evt_efa39dcd-5c68-491e-8565-0c0e4a320731",
"orderId": "O10001"
},
{
"consumerGroup": "dev-warehouse-service-order-events",
"instanceName": "warehouse-1",
"tag": "OrderCreated",
"key": "O10001",
"eventId": "evt_efa39dcd-5c68-491e-8565-0c0e4a320731",
"orderId": "O10001"
}
]
}这个结果说明:
通知服务收到了。积分服务也收到了。仓库服务也收到了。
通知服务虽然有两个实例,但这条消息只投给其中一个实例。因为它们属于同一个 group,是分摊关系,不是每个实例都各来一份。
这就是 consumerGroup 最容易被误解的地方。
什么时候拆 topic
新手容易两个极端。
一种是所有消息都塞一个 topic,最后 tag 爆炸,权限、保留时间、吞吐、治理边界都混在一起。
另一种是每个事件一个 topic,order-created-topic、order-paid-topic、order-cancelled-topic 到处飞,治理成本也很高。
我的建议是先按业务域建 topic:
text
order-event-topic订单领域内的事件先用 tag 区分:
text
OrderCreated
OrderPaid
OrderCancelled
OrderClosed当出现这些情况,再考虑拆 topic:
第一,保留时间差异很大。
比如财务相关消息要保留更久,普通通知消息保留短一点。
第二,吞吐差异很大。
某类事件量特别大,已经影响其他事件的治理和消费。
第三,权限和团队边界差异很大。
比如财务消息不希望普通业务服务随便订阅。
第四,消息类型不同。
普通消息、延时消息、顺序消息、事务消息在 RocketMQ 5.x 里有各自的行为约束。不要为了省事把不同类型混进同一个 topic。
tag 怎么命名
tag 用来做 topic 内的粗粒度过滤。
本教程推荐用事件名:
text
OrderCreated
OrderPaid
OrderCancelled
OrderClosed不要今天写 order-created,明天写 OrderCreated,后天写 created。格式一乱,排查和订阅配置都会乱。
如果你的团队习惯小写中划线,也可以统一成:
text
order-created
order-paid
order-cancelled
order-closed关键不是哪种格式最神圣,关键是全项目统一。
本教程因为 Java 事件类是 OrderCreatedEvent,所以 tag 使用 OrderCreated,和事件名保持一致。
key 放什么
key 优先放业务主键。
订单消息就是:
text
key = orderId比如:
text
key = O10001线上排查时,用户不会告诉你 messageId。他会说:“这笔订单 O10001 为什么没发短信?”
如果消息没有 key,你只能翻应用日志、查业务表、猜时间范围。key 不是为了业务逻辑执行,它是为了排查和检索。
后面讲源码入口和排错时,我们还会继续用这个思路。
eventId、key、messageId 不要混用
这三个东西很容易被混成一个。
messageId 是 RocketMQ 给消息的技术编号。
key 是你放进去的业务检索键,通常是 orderId。
eventId 是业务事件自己的唯一编号。
它们分别回答三个问题:
text
messageId:RocketMQ 里的哪条消息?
key:业务上按什么查?
eventId:这次业务事件是谁?如果一条消息消费失败后被重试,eventId 通常不变。消费者做幂等时,更关心 eventId 或 orderId + action,而不是只看 messageId。
consumerGroup 怎么命名
推荐格式:
text
<env>-<service-name>-<business-scope>比如:
text
dev-notification-service-order-events
dev-points-service-order-events
dev-warehouse-service-order-events生产环境可以是:
text
prod-notification-service-order-events
prod-points-service-order-events
prod-warehouse-service-order-events注意几个坑。
第一,同一个服务的多个实例,用同一个 group。
比如通知服务部署 3 个实例,都是:
text
prod-notification-service-order-events它们一起分摊通知任务。
第二,不同业务服务不要共用 group。
通知服务、积分服务、仓库服务都要收到订单事件,所以它们不能共用 group。
第三,测试环境和生产环境不要共用 group。
环境隔离是命名里最便宜的保险。
跑测试
执行:
powershell
cd D:\idea_space\rocketmq-order-tutorial
mvn.cmd -pl chapter-04-message-model-naming test真实结果:
text
Tests run: 3, Failures: 0, Errors: 0, Skipped: 0
BUILD SUCCESS这 3 个测试分别证明:
- 命名策略能生成稳定的 topic、tag、key、destination 和 consumerGroup。
- 不同 consumerGroup 各收到一份消息,同 group 多实例只分摊一份。
- HTTP demo 暴露出来的结果和模型规则一致。
这一章真正解决了什么
第四章解决的不是“怎么写更多消费者”。
它解决的是:消费者变多以后,消息还能不能被正确组织、正确订阅、正确排查。
你现在应该能回答:
- 为什么通知服务和积分服务不能共用 consumerGroup?
- 为什么 key 要放
orderId? - 为什么 eventId 不能被 orderId 或 messageId 替代?
- 为什么 tag 要统一命名?
- 为什么 topic 不能无限粗,也不能无限碎?
下一章我们正式进入 PushConsumer 自动消费。那一章会把“消费者启动、消费成功、消费失败、返回 ConsumeResult.FAILURE 后会发生什么”讲清楚。
练习题
先自己想。下一章开头我们会复盘这一组题,再进入 PushConsumer 自动消费。
OrderPaid应该继续放在order-event-topic里,还是新建order-paid-topic?你会根据什么判断?- 通知服务部署 3 个实例时,应该用几个 consumerGroup?为什么?
- 通知服务和积分服务都订阅
OrderCreated,它们能不能共用同一个 consumerGroup?会发生什么? - 线上只给你一个订单号
O10001,你希望消息里至少有哪些字段能帮助排查? - 如果测试环境和生产环境用了同一个 consumerGroup,可能出现什么问题?