Skip to content

03. 普通消息:把下单后的旁路动作拆出去

上一章我们把 RocketMQ 5.x 的最小工程接进来了:Spring Boot 能识别 starter,producer 配置能加载,listener 也会尝试连 localhost:8081 的 Proxy。

但到现在为止,我们还没有真正解决第一章的业务问题。

第一章的问题是什么?

不是“我们不会发 MQ”。真正的问题是:下单接口里塞了短信、积分、仓库这些旁路动作。短信慢,下单慢;短信挂,下单失败;以后再加站内信、优惠券、履约、风控通知,代码还会继续往下单方法里堆。

所以第三章正式开始改业务。

产品这时候又来了一句很熟悉的话:“下单成功以后,短信、积分、仓库都要继续做,但是用户别再等了。短信挂了,也别影响下单。”

这句话翻译成工程目标就是:

  1. 下单主流程只保存订单,并发布 OrderCreated 事件。
  2. 短信、积分、仓库这些旁路动作,改成订阅 OrderCreated 后异步处理。
  3. 本章先用普通消息,不上顺序消息、延时消息、事务消息。

为什么先用普通消息?

因为现在的需求只有“订单创建后通知下游”,不要求严格顺序,不要求未来某个时间点再投递,也不要求本地事务和消息发送强一致。RocketMQ 官方对普通消息的定位就是基础消息能力,常用在微服务异步解耦、数据集成和事件驱动场景里。

上一章练习参考答案

先复盘第二章的题,再进入改造。

  1. 为什么本章先把消息设计成 OrderCreatedEvent,而不是直接把订单表所有字段都发出去?

因为消息不是数据库表的远程副本。消息表达的是“发生了什么事”。

OrderCreatedEvent 表示订单已经创建。消费者拿到这个事实后,决定自己要不要处理、怎么处理。如果把订单表所有字段都塞进去,下游会开始依赖订单表结构。以后订单表加字段、改字段、拆字段,消息契约也会跟着抖。

更好的做法是只放消费者判断和追踪所需的最小上下文,比如 eventIdorderIduserIdpayAmountoccurredAt。如果下游需要完整详情,再通过 orderId 查询。

  1. rocketmq.producer.topicOrderEventProperties.destination() 里的 topic 是什么关系?它们不一致时可能会发生什么?

rocketmq.producer.topic 是 starter 初始化 producer 时读取的 topic 配置,帮助 producer 准备路由信息。

OrderEventProperties.destination() 是业务发送时真正传给发送 API 的目标,格式是:

text
order-created-topic:OrderCreated

它们应该保持一致。如果 producer 初始化时配置的是 A topic,业务发送时发到 B topic,轻则排查困难,重则发送失败或路由信息不符合预期。

所以本教程把业务 topic 收在 tutorial.order-event.topic,再让 rocketmq.producer.topic 引用它,避免两处手写不一致。

  1. 如果应用启动时报 Connection refused: localhost:8081,你会按什么顺序排查?

先别改 Java。

我会按这条线排查:

  1. Docker daemon 是否启动:docker version
  2. Compose 是否可用:docker compose version
  3. RocketMQ 容器是否都在:docker ps --filter "name=rocketmq"
  4. Proxy 端口是否暴露:看 rocketmq-proxy 是否映射 8081
  5. 应用配置是否还是 tutorial.rocketmq.endpoints=localhost:8081
  6. listener 的 topic、tag、consumerGroup 是否写对

也就是说,先确认服务端,再确认客户端。不要看到连接拒绝就开始改消息体。

  1. 为什么本地环境要写 ssl-enabled: false

因为本地 RocketMQ Proxy 是明文连接。rocketmq-v5-client-spring-boot-starter 里 producer 的 SSL 默认值容易让初学者踩坑,所以本地单机环境要显式关掉:

yaml
rocketmq:
  producer:
    ssl-enabled: false

生产环境要不要开 SSL,取决于你的集群部署、安全要求和接入规范。本章只处理本地单机学习环境。

  1. 如果 HTTP 接口返回 SENT,但是消费者没有日志,你准备先看 topic/tag,还是先改消息体?为什么?

先看 topic、tag、consumerGroup、Proxy 和消费者是否启动。

因为“生产者返回成功”只能说明消息发送成功,不说明某个消费者一定拿到了。消费者没日志,最常见原因是订阅关系不匹配、消费者组配置错、tag 过滤不匹配、消费者根本没连上 Proxy。

消息体解析问题通常会表现为消费者拿到消息后报解析异常,而不是完全没有消费日志。

这次改造从哪里下手

第一章的同步写法是这样的:

java
orderRepository.save(order);
sideEffects.executeAll(order);
return result;

这段代码最大的问题不是丑,而是边界错了。

保存订单是主流程。短信、积分、仓库是订单创建后的旁路动作。它们可以晚一点,可以失败后重试,可以各自扩展,但不应该继续堵住下单接口。

第三章把它改成:

java
orderRepository.save(order);
orderEventPublisher.publish(orderCreatedEvent);
return result;

旁路动作挪到消费者里:

java
public void handle(OrderCreatedEvent event) {
    Order order = new Order(event.orderId(), event.userId(), event.payAmount(), event.occurredAt());
    sideEffects.executeAll(order);
}

这就是普通消息最核心的价值:主流程发布一个事实,下游按自己的节奏处理这个事实。

本章伴生工程

第三章模块是:

text
D:\idea_space\rocketmq-order-tutorial
  chapter-03-normal-message-async-order/
    pom.xml
    src/main/java/com/example/rocketmqdemo/
      TutorialAsyncOrderApplication.java
      api/
        CreateOrderRequest.java
        OrderController.java
        OrderCreateHttpResponse.java
      config/
        AsyncOrderDemoConfiguration.java
        OrderEventProperties.java
        SideEffectProperties.java
      order/
        CreateOrderCommand.java
        InMemoryOrderRepository.java
        Order.java
        OrderApplicationService.java
        OrderCreateResult.java
        OrderEventPublisher.java
        OrderRepository.java
        OrderSideEffects.java
        SimulatedSideEffectClient.java
      mq/
        LocalAsyncOrderEventPublisher.java
        LocalDemoDispatchProbe.java
        OrderCreatedEvent.java
        OrderCreatedListener.java
        OrderCreatedMessageHandler.java
        OrderEventCodec.java
        PublishedOrderEvent.java
        RocketMqOrderEventPublisher.java
    src/main/resources/
      application.yml
      application-local-demo.yml
      application-rocketmq.yml
    src/test/java/com/example/rocketmqdemo/
      api/OrderControllerLocalDemoIntegrationTest.java
      mq/OrderEventCodecTest.java
      order/OrderApplicationServiceTest.java

这里有两个 profile:

local-demo:不依赖 RocketMQ 服务端,用本地异步分发器演示“接口先返回,旁路随后执行”的效果。这个 profile 是为了让你在 Docker Hub 拉镜像失败、公司网络限制、Proxy 没起来时,也能先看懂业务改造后的行为。

rocketmq:使用真实 RocketMQClientTemplate 发送普通消息,并用 @RocketMQMessageListener 消费消息。等第二章的 RocketMQ 单机环境可用,就跑这个 profile。

注意,local-demo 不是替代 RocketMQ。它只是教学时的一个可运行观察窗口。真正接入 RocketMQ 的代码在 rocketmq profile 里。

依赖不变

第三章继续使用同一个 starter:

xml
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-v5-client-spring-boot-starter</artifactId>
    <version>2.3.4</version>
</dependency>

端口换成 18083,避免和前两章冲突。

application.yml

yaml
server:
  port: 18083

spring:
  application:
    name: chapter-03-normal-message-async-order
  main:
    banner-mode: "off"

tutorial:
  rocketmq:
    endpoints: localhost:8081
  order-event:
    topic: order-created-topic
    tag: OrderCreated
    consumer-group: order-side-effects-demo-group
  side-effects:
    sms:
      delay-ms: 300
      fail: false
    points:
      delay-ms: 200
      fail: false
    warehouse:
      delay-ms: 500
      fail: false

application-rocketmq.yml

yaml
rocketmq:
  producer:
    endpoints: ${tutorial.rocketmq.endpoints}
    topic: ${tutorial.order-event.topic}
    request-timeout: 3
    ssl-enabled: false

先把 topic 建好

如果你要跑真实 RocketMQ profile,先启动第二章的单机环境:

powershell
cd D:\idea_space\rocketmq-order-tutorial
docker compose -f .\docker\rocketmq-standalone.yml up -d

然后创建普通消息 topic:

powershell
docker exec rocketmq-broker sh mqadmin updateTopic `
  -n namesrv:9876 `
  -c DefaultCluster `
  -t order-created-topic `
  -a +message.type=NORMAL

message.type=NORMAL 是 5.x 里很容易被旧教程漏掉的点。我们现在讲的是普通消息,topic 类型就应该按普通消息创建。

事件对象:只表达事实

第三章事件对象比第二章多了 payAmount,因为积分服务可能需要金额来计算积分。

java
package com.example.rocketmqdemo.mq;

import java.time.Instant;

public record OrderCreatedEvent(
        String eventId,
        String orderId,
        String userId,
        long payAmount,
        Instant occurredAt
) {
}

这仍然是一个很小的事件。

它没有订单状态、收货地址、优惠明细、库存详情。因为本章的消费者只需要知道:哪笔订单创建了、谁创建的、金额是多少、什么时候发生的。

小技巧:事件名用过去式。

OrderCreated 表示“订单已经创建”。消费者不能阻止这件事,只能响应这件事。这个命名会逼你把“事件”和“命令”分清楚。

订单服务:保存订单后发布事件

第三章的 OrderApplicationService 变成这样:

java
@Service
public class OrderApplicationService {

    private final OrderRepository orderRepository;
    private final OrderEventPublisher orderEventPublisher;
    private final Clock clock;
    private final AtomicLong sequence = new AtomicLong(10000);

    public OrderCreateResult createOrder(CreateOrderCommand command) {
        String orderId = "O" + sequence.incrementAndGet();
        Instant createdAt = clock.instant();
        Order order = new Order(orderId, command.userId(), command.payAmount(), createdAt);
        orderRepository.save(order);
        log.info("order saved orderId={}, userId={}, payAmount={}", orderId, command.userId(), command.payAmount());

        OrderCreatedEvent event = new OrderCreatedEvent(
                "evt_" + UUID.randomUUID(),
                orderId,
                command.userId(),
                command.payAmount(),
                createdAt
        );
        PublishedOrderEvent published = orderEventPublisher.publish(event);
        log.info("order event published orderId={}, eventId={}, messageId={}",
                orderId, published.eventId(), published.messageId());

        return new OrderCreateResult(
                orderId,
                order.userId(),
                order.payAmount(),
                published.eventId(),
                published.messageId(),
                "PUBLISHED"
        );
    }
}

注意,OrderApplicationService 里已经没有:

java
sideEffects.executeAll(order);

这就是本章最重要的一刀。

下单服务不再知道“短信、积分、仓库具体怎么处理”。它只发布订单创建事件。以后产品再加站内信、优惠券、履约通知,不应该继续往下单方法里塞代码,而是新增消费者。

为什么还要有接口层

Controller 还是 /api/orders

java
@RestController
@RequestMapping("/api/orders")
public class OrderController {

    private final OrderApplicationService orderApplicationService;

    @PostMapping
    public OrderCreateHttpResponse createOrder(@RequestBody CreateOrderRequest request) {
        long startNanos = System.nanoTime();
        OrderCreateResult result = orderApplicationService.createOrder(
                new CreateOrderCommand(request.userId(), request.payAmount())
        );
        long costMs = Duration.ofNanos(System.nanoTime() - startNanos).toMillis();
        return new OrderCreateHttpResponse(
                "SUCCESS",
                result.orderId(),
                result.userId(),
                result.payAmount(),
                result.eventId(),
                result.messageId(),
                result.downstream(),
                costMs
        );
    }
}

响应里多了几个字段:

java
public record OrderCreateHttpResponse(
        String status,
        String orderId,
        String userId,
        long payAmount,
        String eventId,
        String messageId,
        String downstream,
        long costMs
) {
}

downstream=PUBLISHED 表示“下游任务已经通过事件交出去了”,不是“短信、积分、仓库都已经成功”。

这个语义一定要分清。

很多线上事故就是因为接口返回了“下单成功”,前端或运营以为所有旁路动作都成功了。异步以后,主流程成功和旁路成功是两件事。

发送接口先抽象出来

订单服务依赖的是接口:

java
package com.example.rocketmqdemo.order;

import com.example.rocketmqdemo.mq.OrderCreatedEvent;
import com.example.rocketmqdemo.mq.PublishedOrderEvent;

public interface OrderEventPublisher {

    PublishedOrderEvent publish(OrderCreatedEvent event);
}

为什么这里要抽接口?

不是为了炫技。它有两个实际好处:

  1. 单元测试可以直接验证订单服务“发了什么事件”,不需要启动 RocketMQ。
  2. 教程可以提供 local-demorocketmq 两种运行方式,而业务代码不用改。

这就是工程里有意义的抽象:它隔离外部中间件边界。

真实 RocketMQ 发送实现

rocketmq profile 下使用 RocketMqOrderEventPublisher

java
@Component
@Profile("rocketmq")
public class RocketMqOrderEventPublisher implements OrderEventPublisher {

    private final RocketMQClientTemplate rocketMQClientTemplate;
    private final OrderEventProperties properties;
    private final OrderEventCodec codec;

    @Override
    public PublishedOrderEvent publish(OrderCreatedEvent event) {
        SendReceipt receipt = rocketMQClientTemplate.syncSendNormalMessage(
                properties.destination(),
                codec.encode(event)
        );
        return new PublishedOrderEvent(event.eventId(), event.orderId(), String.valueOf(receipt.getMessageId()));
    }
}

这里发送的是 JSON 字符串,而不是直接把 Java 对象丢进去。

为什么?

因为我们想让生产者和消费者之间的消息格式更明确。消费者拿到 MessageView 后,可以按 UTF-8 JSON 解码成 OrderCreatedEvent。这比“依赖 starter 默认怎么序列化 Object”更适合教程和排障。

编码器很简单:

java
@Component
public class OrderEventCodec {

    private final ObjectMapper objectMapper;

    public String encode(OrderCreatedEvent event) {
        try {
            return objectMapper.writeValueAsString(event);
        } catch (JsonProcessingException exception) {
            throw new IllegalArgumentException("encode order event failed", exception);
        }
    }

    public OrderCreatedEvent decode(ByteBuffer body) {
        ByteBuffer duplicate = body.asReadOnlyBuffer();
        byte[] bytes = new byte[duplicate.remaining()];
        duplicate.get(bytes);
        return decode(bytes);
    }
}

小技巧:不要用 body.array() 偷懒。ByteBuffer 不一定有可访问的底层数组。复制 remaining bytes 更稳。

真实 RocketMQ 消费实现

消费者监听同一个 topic:

java
@Component
@Profile("rocketmq")
@RocketMQMessageListener(
        endpoints = "${tutorial.rocketmq.endpoints}",
        topic = "${tutorial.order-event.topic}",
        tag = "*",
        consumerGroup = "${tutorial.order-event.consumer-group}"
)
public class OrderCreatedListener implements RocketMQListener {

    private final OrderCreatedMessageHandler handler;
    private final OrderEventCodec codec;

    @Override
    public ConsumeResult consume(MessageView messageView) {
        try {
            OrderCreatedEvent event = codec.decode(messageView.getBody());
            log.info("consume order event messageId={}, eventId={}, orderId={}",
                    messageView.getMessageId(), event.eventId(), event.orderId());
            handler.handle(event);
            return ConsumeResult.SUCCESS;
        } catch (RuntimeException exception) {
            log.warn("consume order event failed messageId={}, reason={}",
                    messageView.getMessageId(), exception.getMessage());
            return ConsumeResult.FAILURE;
        }
    }
}

这里有一个关键点:消费者失败时返回 ConsumeResult.FAILURE

这不是为了让接口失败。接口早就返回了。

这是告诉 RocketMQ:这条消息我这次没处理好,你后面可以按消费重试策略再投递。重试、死信和幂等,我们后面会专门讲。现在你只要先知道:消费者失败不再等于下单接口失败。

旁路处理器

真正执行短信、积分、仓库的是:

java
@Component
public class OrderCreatedMessageHandler {

    private final OrderSideEffects sideEffects;

    public void handle(OrderCreatedEvent event) {
        Order order = new Order(event.orderId(), event.userId(), event.payAmount(), event.occurredAt());
        log.info("handle order created event eventId={}, orderId={}", event.eventId(), event.orderId());
        sideEffects.executeAll(order);
    }
}

它的位置很重要。

下单服务不直接调用它。消费者调用它。这样主流程和旁路动作之间隔了一层消息。

你以后要把短信拆到通知服务、积分拆到会员服务、仓库拆到履约服务,也是在这个边界继续演进。

先跑测试

执行:

powershell
cd D:\idea_space\rocketmq-order-tutorial
mvn.cmd -pl chapter-03-normal-message-async-order test

真实结果:

text
Tests run: 3, Failures: 0, Errors: 0, Skipped: 0
BUILD SUCCESS

这 3 个测试分别证明:

  1. 订单服务保存订单后发布 OrderCreatedEvent,不再内联执行旁路动作。
  2. 本地异步演示里,HTTP 响应会先返回,延迟 3 秒的短信失败发生在异步线程。
  3. OrderEventCodec 可以把事件编码成 UTF-8 JSON,再解码回来。

这个测试组合比只测 Controller 更有价值。它同时覆盖了业务边界、运行效果和消息格式。

先用 local-demo 看效果

因为我当前机器的 Docker Desktop 可以启动,但拉取 apache/rocketmq:5.3.2 时被本机代理阻断,所以这一章先用 local-demo 跑出可观察效果。

启动:

powershell
cd D:\idea_space\rocketmq-order-tutorial
mvn.cmd -pl chapter-03-normal-message-async-order spring-boot:run `
  -Dspring-boot.run.profiles=local-demo `
  "-Dspring-boot.run.arguments=--tutorial.side-effects.sms.delay-ms=3000 --tutorial.side-effects.sms.fail=true --tutorial.side-effects.points.delay-ms=0 --tutorial.side-effects.warehouse.delay-ms=0"

这里故意让短信 3 秒后失败。

然后调用下单接口:

powershell
Invoke-RestMethod `
  -Uri 'http://127.0.0.1:18083/api/orders' `
  -Method Post `
  -ContentType 'application/json; charset=utf-8' `
  -Body '{"userId":"U10002","payAmount":29900}'

真实响应:

text
status       : SUCCESS
orderId      : O10002
downstream   : PUBLISHED
messageId    : LOCAL-10002
serverCostMs : 3
clientCostMs : 148

看到没有?

短信明明被设置成 3 秒后失败,但接口 148ms 就返回了。服务端统计的业务耗时只有 3ms。

再看日志:

text
order saved orderId=O10002, userId=U10002, payAmount=29900
order event published orderId=O10002, eventId=evt_016c068f-4127-4258-a7ec-533eff438870, messageId=LOCAL-10002
handle order created event eventId=evt_016c068f-4127-4258-a7ec-533eff438870, orderId=O10002
side-effect start name=sms, orderId=O10002, delayMs=3000, fail=true
side-effect failed name=sms
local async downstream failed orderId=O10002, reason=side effect failed: sms

这就是第一章和第三章最直观的区别。

第一章:短信失败,下单接口失败。

第三章:下单接口先成功返回,短信失败留在异步处理链路里。

当然,真实项目里不能只打印失败日志就完事。你还需要重试、死信、告警、幂等和补偿。但这些是后面的章节要解决的问题。本章先把主链路和旁路解开。

再跑真实 RocketMQ profile

当第二章的 RocketMQ 单机环境可用后,切到真实 RocketMQ:

powershell
cd D:\idea_space\rocketmq-order-tutorial
mvn.cmd -pl chapter-03-normal-message-async-order spring-boot:run `
  -Dspring-boot.run.profiles=rocketmq

调用还是同一个接口:

powershell
Invoke-RestMethod `
  -Uri 'http://127.0.0.1:18083/api/orders' `
  -Method Post `
  -ContentType 'application/json; charset=utf-8' `
  -Body '{"userId":"U10003","payAmount":39900}'

正常时,你会看到响应里的 messageId 变成 RocketMQ 返回的真实 messageId。控制台也会出现类似:

text
consume order event messageId=..., eventId=..., orderId=O10003
handle order created event eventId=..., orderId=O10003
side-effect success name=sms, orderId=O10003
side-effect success name=points, orderId=O10003
side-effect success name=warehouse, orderId=O10003

如果启动时报 Connection refused: localhost:8081,回到第二章的环境排查。这个错误说明应用连不上 Proxy,不是普通消息代码写错。

这章还没有解决什么

这里一定要诚实。

第三章只解决了“旁路动作不堵主链路”。它没有解决所有问题。

它还没有解决:

  1. 订单保存成功,但消息发送失败怎么办?
  2. 消费者失败后重试,会不会重复发短信、重复加积分?
  3. 积分和仓库要不要使用同一个消费者组?
  4. 消费堆积了怎么发现?
  5. 消息发出去了,但下游处理很慢,怎么告警?

这些不是本章该硬塞的内容。

如果现在就讲事务消息、死信、幂等、堆积调优,你会觉得“每个概念都听过,但不知道为什么要用”。所以我们按业务问题继续往前走。

下一章先讲消息模型与命名。因为一旦消费者变多,topic、tag、consumerGroup、eventId、key 怎么命名,会直接影响后面排查和扩展。

小技巧

第一,事件用过去式。

OrderCreated 是事件,表示已经发生。CreateOrder 是命令,表示要求别人做事。广播给多个下游时,优先用事件。

第二,消息体不要太大。

消息里放最小上下文。下游需要完整详情时,用 orderId 去查。别把订单表整行复制成消息契约。

第三,异步以后,接口成功不等于下游成功。

接口只能告诉你“订单创建成功,事件已发布”。短信、积分、仓库成功与否,要看消费者处理结果。

第四,先把链路跑通,再谈高级特性。

普通消息能解决“旁路不堵主流程”这个问题。等你真实遇到重复、失败、堆积、一致性问题,再引入重试、死信、幂等、事务消息,学习才会有抓手。

练习题

先自己想。下一章开头我们会复盘这一组题,再进入消息模型和命名。

  1. 第三章里,downstream=PUBLISHED 为什么不能写成 downstream=SUCCESS
  2. 通知服务、积分服务、仓库服务如果未来拆成三个独立服务,它们应该使用同一个 consumerGroup 吗?为什么?
  3. 如果短信消费者失败后 RocketMQ 重试,怎么避免重复发短信?
  4. OrderCreatedEvent 里为什么要有 eventId,只有 orderId 行不行?
  5. 如果订单保存成功,但普通消息发送失败,本章方案会留下什么风险?你觉得后面应该用什么机制补上?

Built with VitePress. Deployed on Cloudflare Pages.