Skip to content

04. 消息模型与命名:别让消费者互相抢消息

第三章我们终于把下单主流程和旁路动作拆开了。

下单接口现在只做两件事:保存订单,发布 OrderCreated 事件。短信、积分、仓库这些动作,由消费者异步处理。

看起来已经清爽多了,对吧?

但是业务一涨,新的问题马上出来。

通知服务说:“我怎么有时候收不到订单消息?”

积分服务说:“我和通知服务是不是用了同一个 consumerGroup?”

仓库服务说:“我只想要 OrderPaid,为什么 OrderCreated 也过来了?”

线上排查时,客服丢给你一个订单号:“老板问这笔订单的消息到底发没发?”你打开日志,发现只有 RocketMQ 的 messageId,没有业务 key。那一刻你就会知道:消息发出去只是第一步,消息模型设计不好,后面全是账。

所以第四章先不继续堆消费者代码。我们先把 topictagkeyeventIdconsumerGroup 讲清楚。

这一章的目标是:你以后看到一个订单消息,就能立刻判断它属于哪类消息、是什么事件、怎么排查、哪些服务应该收到、哪些实例只是分摊处理。

上一章练习参考答案

先复盘第三章留下的问题。

  1. 第三章里,downstream=PUBLISHED 为什么不能写成 downstream=SUCCESS

因为异步以后,主流程成功和下游成功不是同一件事。

PUBLISHED 只表示订单服务已经把 OrderCreated 事件交出去了。它不代表短信已经发了,不代表积分已经加了,也不代表仓库已经处理完。

如果你把它写成 SUCCESS,调用方很容易误解成“所有旁路动作都成功”。以后出了问题,前端、客服、运营都会被这个字段误导。

更稳的表达是:

text
订单创建状态:SUCCESS
下游投递状态:PUBLISHED
下游处理状态:由消费者自己的日志、重试、死信、补偿来体现
  1. 通知服务、积分服务、仓库服务如果未来拆成三个独立服务,它们应该使用同一个 consumerGroup 吗?为什么?

不应该。

consumerGroup 表示一组消费者共同分摊同一份消费任务。同一个 group 里的多个实例,是“你处理一部分,我处理一部分”。

通知、积分、仓库是三个不同业务方。它们都需要收到订单事件,各自做自己的事。所以它们应该使用三个不同 consumerGroup。

如果它们共用一个 group,就会互相抢消息。结果可能是通知服务消费了这条消息,积分服务就收不到了。

  1. 如果短信消费者失败后 RocketMQ 重试,怎么避免重复发短信?

要做消费幂等。

比如用 eventId 或者 orderId + action 做幂等记录:

text
orderId=O10001, action=SEND_ORDER_CREATED_SMS

消费者开始处理前先查这个记录。如果已经成功处理过,就直接返回消费成功,不再重复发短信。如果没处理过,再发送短信,并在成功后写入处理记录。

注意,MQ 的重试机制不能替你保证“业务只执行一次”。它只负责在失败时再次投递。真正的业务去重,要靠消费者自己设计幂等。

  1. OrderCreatedEvent 里为什么要有 eventId,只有 orderId 行不行?

只有 orderId 不够。

orderId 表示业务对象,eventId 表示这一次发生的事件。

同一笔订单可能发生很多事件:OrderCreatedOrderPaidOrderCancelledOrderClosed。甚至某些补偿场景下,同一类事件也可能重新生成一次。没有 eventId,你很难区分“同一笔订单的不同事件”和“同一事件的重复投递”。

所以:

text
orderId:这是谁的订单
eventId:这是哪一次事件
messageId:这是 RocketMQ 里哪一条消息

三者都重要,但不能混用。

  1. 如果订单保存成功,但普通消息发送失败,本章方案会留下什么风险?你觉得后面应该用什么机制补上?

风险是:订单已经创建成功,但下游永远不知道这件事。

也就是本地业务状态和消息投递状态不一致。

这个问题不是第四章解决的。后面讲事务消息时,我们会专门处理“本地事务成功和消息发送成功如何保持一致”。在讲事务消息之前,你先记住这个坑:普通消息异步解耦解决的是主链路阻塞问题,不自动解决本地事务和消息投递一致性。

业务又长了一点,问题就变了

第三章只有一个消费者,看起来没什么命名压力。

现在产品继续加需求:

下单创建后,通知服务要发短信和站内信;积分服务要加积分;仓库服务要创建履约任务。支付成功后,通知服务要通知用户,积分服务要确认积分,仓库服务要预占库存,财务服务还要记账。

你要是没有消息模型设计,很容易写成这样:

text
topic: order-topic
tag: created
group: order-group
key: 空

刚开始能跑。

但出问题时你会很难受:

通知服务和积分服务用了同一个 group,它们互相抢消息;created 这个 tag 过于随意,别的系统看不出它是订单创建;没有 key,按订单号查消息很麻烦;测试环境和开发环境 group 不隔离,互相影响。

所以这一章要先定规则。

本章伴生工程

第四章模块是:

text
D:\idea_space\rocketmq-order-tutorial
  chapter-04-message-model-naming/
    pom.xml
    src/main/java/com/example/rocketmqdemo/
      TutorialMessageModelApplication.java
      api/
        DeliveryView.java
        MessageModelDemoController.java
        MessageModelDemoResponse.java
        PublishOrderCreatedDemoRequest.java
      config/
        MessageModelDemoConfiguration.java
      model/
        ConsumerGroupSubscribers.java
        DeliveryRecord.java
        LocalMessageConsumer.java
        LocalMessageModelBus.java
        MessageEnvelope.java
        MessageNamingPolicy.java
        OrderCreatedEvent.java
        RecordingConsumer.java
        Subscriber.java
    src/main/resources/
      application.yml
    src/test/java/com/example/rocketmqdemo/
      api/MessageModelDemoControllerTest.java
      model/LocalMessageModelBusTest.java
      model/MessageNamingPolicyTest.java

这一章的伴生工程不依赖真实 RocketMQ 容器。

原因很简单:我们要先把模型语义讲清楚。真实 RocketMQ 环境负责投递,本章的本地 bus 负责把“投递语义”演示出来:不同 group 都能收到一份;同一个 group 里的多个实例只分摊一份。

这不是替代 RocketMQ,而是把模型拆开讲。

先把五个概念说人话

topic 是消息的大类。

订单领域里,我们现在用:

text
order-event-topic

它表示“订单事件”这个大类。OrderCreatedOrderPaidOrderCancelled 都可以先放在这个 topic 里,再用 tag 区分。

tag 是 topic 里的事件类型。

本章用:

text
OrderCreated

tag 不要随便写成 createdcreateorder_create 混着来。建议直接用事件名,保持过去式:OrderCreatedOrderPaidOrderClosed

key 是排查用的业务关键字。

订单消息里,最自然的 key 是:

text
O10001

也就是 orderId。线上排查时,别人通常给你订单号,不会给你 RocketMQ 的 messageId。

eventId 是事件自己的唯一编号。

它回答的是:这是哪一次业务事件?

consumerGroup 是消费分组。

它回答的是:谁和谁一起分摊同一份消费任务?

同一个服务的多个实例,用同一个 group。不同业务服务,要用不同 group。

命名策略写进代码

不要让每个开发各写一套字符串。

第四章用 MessageNamingPolicy 收口:

java
public class MessageNamingPolicy {

    private final String orderEventTopic;
    private final String environment;

    public MessageEnvelope envelope(OrderCreatedEvent event) {
        return new MessageEnvelope(
                orderEventTopic,
                event.tag(),
                event.orderId(),
                event.eventId(),
                event.orderId(),
                event
        );
    }

    public String consumerGroup(String serviceName) {
        return environment + "-" + normalize(serviceName) + "-order-events";
    }
}

测试里验证结果:

text
topic: order-event-topic
tag: OrderCreated
key: O10001
eventId: evt_10001
destination: order-event-topic:OrderCreated
consumerGroup:
  dev-notification-service-order-events
  dev-points-service-order-events
  dev-warehouse-service-order-events

这里有两个小心思。

第一,group 里带了环境:dev

开发、测试、生产不要混用同一个 group。否则测试服务可能抢生产消息,这种事故很难看。

第二,group 里带了服务名。

notification-servicepoints-service 是不同业务方,就应该是不同 group。

本地 bus 演示投递语义

第四章的 LocalMessageModelBus 只做一件事:模拟 consumerGroup 的投递语义。

订阅代码长这样:

java
bus.subscribe("dev-notification-service-order-events", "notification-1", "OrderCreated", notificationA);
bus.subscribe("dev-notification-service-order-events", "notification-2", "OrderCreated", notificationB);
bus.subscribe("dev-points-service-order-events", "points-1", "OrderCreated", points);
bus.subscribe("dev-warehouse-service-order-events", "warehouse-1", "OrderCreated", warehouse);

注意,通知服务有两个实例,但它们是同一个 group。

积分服务和仓库服务是另外两个 group。

发布第一条消息:

java
MessageEnvelope first = policy.envelope(newEvent("evt_10001", "O10001"));
List<DeliveryRecord> firstDeliveries = bus.publish(first);

测试断言:

java
assertThat(firstDeliveries).extracting(DeliveryRecord::consumerGroup).containsExactlyInAnyOrder(
        "dev-notification-service-order-events",
        "dev-points-service-order-events",
        "dev-warehouse-service-order-events"
);

assertThat(firstDeliveries)
        .filteredOn(record -> record.consumerGroup().equals("dev-notification-service-order-events"))
        .hasSize(1);

这段测试要表达的不是本地 bus 多厉害,而是一个 RocketMQ 使用常识:

同一条消息,对不同 consumerGroup 来说,各有一份消费机会;对同一个 consumerGroup 里的多个实例来说,它们共同分摊这一份消费任务。

如果你把通知服务和积分服务放进同一个 group,它们就会互相抢消息。

HTTP 看一眼效果

启动第四章:

powershell
cd D:\idea_space\rocketmq-order-tutorial
mvn.cmd -pl chapter-04-message-model-naming spring-boot:run

调用:

powershell
Invoke-RestMethod `
  -Uri 'http://127.0.0.1:18084/api/message-model/order-created' `
  -Method Post `
  -ContentType 'application/json; charset=utf-8' `
  -Body '{"orderId":"O10001","userId":"U10001","payAmount":19900}'

真实响应:

json
{
  "topic": "order-event-topic",
  "tag": "OrderCreated",
  "key": "O10001",
  "eventId": "evt_efa39dcd-5c68-491e-8565-0c0e4a320731",
  "destination": "order-event-topic:OrderCreated",
  "deliveries": [
    {
      "consumerGroup": "dev-notification-service-order-events",
      "instanceName": "notification-1",
      "tag": "OrderCreated",
      "key": "O10001",
      "eventId": "evt_efa39dcd-5c68-491e-8565-0c0e4a320731",
      "orderId": "O10001"
    },
    {
      "consumerGroup": "dev-points-service-order-events",
      "instanceName": "points-1",
      "tag": "OrderCreated",
      "key": "O10001",
      "eventId": "evt_efa39dcd-5c68-491e-8565-0c0e4a320731",
      "orderId": "O10001"
    },
    {
      "consumerGroup": "dev-warehouse-service-order-events",
      "instanceName": "warehouse-1",
      "tag": "OrderCreated",
      "key": "O10001",
      "eventId": "evt_efa39dcd-5c68-491e-8565-0c0e4a320731",
      "orderId": "O10001"
    }
  ]
}

这个结果说明:

通知服务收到了。积分服务也收到了。仓库服务也收到了。

通知服务虽然有两个实例,但这条消息只投给其中一个实例。因为它们属于同一个 group,是分摊关系,不是每个实例都各来一份。

这就是 consumerGroup 最容易被误解的地方。

什么时候拆 topic

新手容易两个极端。

一种是所有消息都塞一个 topic,最后 tag 爆炸,权限、保留时间、吞吐、治理边界都混在一起。

另一种是每个事件一个 topic,order-created-topicorder-paid-topicorder-cancelled-topic 到处飞,治理成本也很高。

我的建议是先按业务域建 topic:

text
order-event-topic

订单领域内的事件先用 tag 区分:

text
OrderCreated
OrderPaid
OrderCancelled
OrderClosed

当出现这些情况,再考虑拆 topic:

第一,保留时间差异很大。

比如财务相关消息要保留更久,普通通知消息保留短一点。

第二,吞吐差异很大。

某类事件量特别大,已经影响其他事件的治理和消费。

第三,权限和团队边界差异很大。

比如财务消息不希望普通业务服务随便订阅。

第四,消息类型不同。

普通消息、延时消息、顺序消息、事务消息在 RocketMQ 5.x 里有各自的行为约束。不要为了省事把不同类型混进同一个 topic。

tag 怎么命名

tag 用来做 topic 内的粗粒度过滤。

本教程推荐用事件名:

text
OrderCreated
OrderPaid
OrderCancelled
OrderClosed

不要今天写 order-created,明天写 OrderCreated,后天写 created。格式一乱,排查和订阅配置都会乱。

如果你的团队习惯小写中划线,也可以统一成:

text
order-created
order-paid
order-cancelled
order-closed

关键不是哪种格式最神圣,关键是全项目统一。

本教程因为 Java 事件类是 OrderCreatedEvent,所以 tag 使用 OrderCreated,和事件名保持一致。

key 放什么

key 优先放业务主键。

订单消息就是:

text
key = orderId

比如:

text
key = O10001

线上排查时,用户不会告诉你 messageId。他会说:“这笔订单 O10001 为什么没发短信?”

如果消息没有 key,你只能翻应用日志、查业务表、猜时间范围。key 不是为了业务逻辑执行,它是为了排查和检索。

后面讲源码入口和排错时,我们还会继续用这个思路。

eventId、key、messageId 不要混用

这三个东西很容易被混成一个。

messageId 是 RocketMQ 给消息的技术编号。

key 是你放进去的业务检索键,通常是 orderId

eventId 是业务事件自己的唯一编号。

它们分别回答三个问题:

text
messageId:RocketMQ 里的哪条消息?
key:业务上按什么查?
eventId:这次业务事件是谁?

如果一条消息消费失败后被重试,eventId 通常不变。消费者做幂等时,更关心 eventIdorderId + action,而不是只看 messageId。

consumerGroup 怎么命名

推荐格式:

text
<env>-<service-name>-<business-scope>

比如:

text
dev-notification-service-order-events
dev-points-service-order-events
dev-warehouse-service-order-events

生产环境可以是:

text
prod-notification-service-order-events
prod-points-service-order-events
prod-warehouse-service-order-events

注意几个坑。

第一,同一个服务的多个实例,用同一个 group。

比如通知服务部署 3 个实例,都是:

text
prod-notification-service-order-events

它们一起分摊通知任务。

第二,不同业务服务不要共用 group。

通知服务、积分服务、仓库服务都要收到订单事件,所以它们不能共用 group。

第三,测试环境和生产环境不要共用 group。

环境隔离是命名里最便宜的保险。

跑测试

执行:

powershell
cd D:\idea_space\rocketmq-order-tutorial
mvn.cmd -pl chapter-04-message-model-naming test

真实结果:

text
Tests run: 3, Failures: 0, Errors: 0, Skipped: 0
BUILD SUCCESS

这 3 个测试分别证明:

  1. 命名策略能生成稳定的 topic、tag、key、destination 和 consumerGroup。
  2. 不同 consumerGroup 各收到一份消息,同 group 多实例只分摊一份。
  3. HTTP demo 暴露出来的结果和模型规则一致。

这一章真正解决了什么

第四章解决的不是“怎么写更多消费者”。

它解决的是:消费者变多以后,消息还能不能被正确组织、正确订阅、正确排查。

你现在应该能回答:

  1. 为什么通知服务和积分服务不能共用 consumerGroup?
  2. 为什么 key 要放 orderId
  3. 为什么 eventId 不能被 orderId 或 messageId 替代?
  4. 为什么 tag 要统一命名?
  5. 为什么 topic 不能无限粗,也不能无限碎?

下一章我们正式进入 PushConsumer 自动消费。那一章会把“消费者启动、消费成功、消费失败、返回 ConsumeResult.FAILURE 后会发生什么”讲清楚。

练习题

先自己想。下一章开头我们会复盘这一组题,再进入 PushConsumer 自动消费。

  1. OrderPaid 应该继续放在 order-event-topic 里,还是新建 order-paid-topic?你会根据什么判断?
  2. 通知服务部署 3 个实例时,应该用几个 consumerGroup?为什么?
  3. 通知服务和积分服务都订阅 OrderCreated,它们能不能共用同一个 consumerGroup?会发生什么?
  4. 线上只给你一个订单号 O10001,你希望消息里至少有哪些字段能帮助排查?
  5. 如果测试环境和生产环境用了同一个 consumerGroup,可能出现什么问题?

Built with VitePress. Deployed on Cloudflare Pages.