切换主题
03. 普通消息:把下单后的旁路动作拆出去
上一章我们把 RocketMQ 5.x 的最小工程接进来了:Spring Boot 能识别 starter,producer 配置能加载,listener 也会尝试连 localhost:8081 的 Proxy。
但到现在为止,我们还没有真正解决第一章的业务问题。
第一章的问题是什么?
不是“我们不会发 MQ”。真正的问题是:下单接口里塞了短信、积分、仓库这些旁路动作。短信慢,下单慢;短信挂,下单失败;以后再加站内信、优惠券、履约、风控通知,代码还会继续往下单方法里堆。
所以第三章正式开始改业务。
产品这时候又来了一句很熟悉的话:“下单成功以后,短信、积分、仓库都要继续做,但是用户别再等了。短信挂了,也别影响下单。”
这句话翻译成工程目标就是:
- 下单主流程只保存订单,并发布
OrderCreated事件。 - 短信、积分、仓库这些旁路动作,改成订阅
OrderCreated后异步处理。 - 本章先用普通消息,不上顺序消息、延时消息、事务消息。
为什么先用普通消息?
因为现在的需求只有“订单创建后通知下游”,不要求严格顺序,不要求未来某个时间点再投递,也不要求本地事务和消息发送强一致。RocketMQ 官方对普通消息的定位就是基础消息能力,常用在微服务异步解耦、数据集成和事件驱动场景里。
上一章练习参考答案
先复盘第二章的题,再进入改造。
- 为什么本章先把消息设计成
OrderCreatedEvent,而不是直接把订单表所有字段都发出去?
因为消息不是数据库表的远程副本。消息表达的是“发生了什么事”。
OrderCreatedEvent 表示订单已经创建。消费者拿到这个事实后,决定自己要不要处理、怎么处理。如果把订单表所有字段都塞进去,下游会开始依赖订单表结构。以后订单表加字段、改字段、拆字段,消息契约也会跟着抖。
更好的做法是只放消费者判断和追踪所需的最小上下文,比如 eventId、orderId、userId、payAmount、occurredAt。如果下游需要完整详情,再通过 orderId 查询。
rocketmq.producer.topic和OrderEventProperties.destination()里的 topic 是什么关系?它们不一致时可能会发生什么?
rocketmq.producer.topic 是 starter 初始化 producer 时读取的 topic 配置,帮助 producer 准备路由信息。
OrderEventProperties.destination() 是业务发送时真正传给发送 API 的目标,格式是:
text
order-created-topic:OrderCreated它们应该保持一致。如果 producer 初始化时配置的是 A topic,业务发送时发到 B topic,轻则排查困难,重则发送失败或路由信息不符合预期。
所以本教程把业务 topic 收在 tutorial.order-event.topic,再让 rocketmq.producer.topic 引用它,避免两处手写不一致。
- 如果应用启动时报
Connection refused: localhost:8081,你会按什么顺序排查?
先别改 Java。
我会按这条线排查:
- Docker daemon 是否启动:
docker version - Compose 是否可用:
docker compose version - RocketMQ 容器是否都在:
docker ps --filter "name=rocketmq" - Proxy 端口是否暴露:看
rocketmq-proxy是否映射8081 - 应用配置是否还是
tutorial.rocketmq.endpoints=localhost:8081 - listener 的 topic、tag、consumerGroup 是否写对
也就是说,先确认服务端,再确认客户端。不要看到连接拒绝就开始改消息体。
- 为什么本地环境要写
ssl-enabled: false?
因为本地 RocketMQ Proxy 是明文连接。rocketmq-v5-client-spring-boot-starter 里 producer 的 SSL 默认值容易让初学者踩坑,所以本地单机环境要显式关掉:
yaml
rocketmq:
producer:
ssl-enabled: false生产环境要不要开 SSL,取决于你的集群部署、安全要求和接入规范。本章只处理本地单机学习环境。
- 如果 HTTP 接口返回
SENT,但是消费者没有日志,你准备先看 topic/tag,还是先改消息体?为什么?
先看 topic、tag、consumerGroup、Proxy 和消费者是否启动。
因为“生产者返回成功”只能说明消息发送成功,不说明某个消费者一定拿到了。消费者没日志,最常见原因是订阅关系不匹配、消费者组配置错、tag 过滤不匹配、消费者根本没连上 Proxy。
消息体解析问题通常会表现为消费者拿到消息后报解析异常,而不是完全没有消费日志。
这次改造从哪里下手
第一章的同步写法是这样的:
java
orderRepository.save(order);
sideEffects.executeAll(order);
return result;这段代码最大的问题不是丑,而是边界错了。
保存订单是主流程。短信、积分、仓库是订单创建后的旁路动作。它们可以晚一点,可以失败后重试,可以各自扩展,但不应该继续堵住下单接口。
第三章把它改成:
java
orderRepository.save(order);
orderEventPublisher.publish(orderCreatedEvent);
return result;旁路动作挪到消费者里:
java
public void handle(OrderCreatedEvent event) {
Order order = new Order(event.orderId(), event.userId(), event.payAmount(), event.occurredAt());
sideEffects.executeAll(order);
}这就是普通消息最核心的价值:主流程发布一个事实,下游按自己的节奏处理这个事实。
本章伴生工程
第三章模块是:
text
D:\idea_space\rocketmq-order-tutorial
chapter-03-normal-message-async-order/
pom.xml
src/main/java/com/example/rocketmqdemo/
TutorialAsyncOrderApplication.java
api/
CreateOrderRequest.java
OrderController.java
OrderCreateHttpResponse.java
config/
AsyncOrderDemoConfiguration.java
OrderEventProperties.java
SideEffectProperties.java
order/
CreateOrderCommand.java
InMemoryOrderRepository.java
Order.java
OrderApplicationService.java
OrderCreateResult.java
OrderEventPublisher.java
OrderRepository.java
OrderSideEffects.java
SimulatedSideEffectClient.java
mq/
LocalAsyncOrderEventPublisher.java
LocalDemoDispatchProbe.java
OrderCreatedEvent.java
OrderCreatedListener.java
OrderCreatedMessageHandler.java
OrderEventCodec.java
PublishedOrderEvent.java
RocketMqOrderEventPublisher.java
src/main/resources/
application.yml
application-local-demo.yml
application-rocketmq.yml
src/test/java/com/example/rocketmqdemo/
api/OrderControllerLocalDemoIntegrationTest.java
mq/OrderEventCodecTest.java
order/OrderApplicationServiceTest.java这里有两个 profile:
local-demo:不依赖 RocketMQ 服务端,用本地异步分发器演示“接口先返回,旁路随后执行”的效果。这个 profile 是为了让你在 Docker Hub 拉镜像失败、公司网络限制、Proxy 没起来时,也能先看懂业务改造后的行为。
rocketmq:使用真实 RocketMQClientTemplate 发送普通消息,并用 @RocketMQMessageListener 消费消息。等第二章的 RocketMQ 单机环境可用,就跑这个 profile。
注意,local-demo 不是替代 RocketMQ。它只是教学时的一个可运行观察窗口。真正接入 RocketMQ 的代码在 rocketmq profile 里。
依赖不变
第三章继续使用同一个 starter:
xml
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-v5-client-spring-boot-starter</artifactId>
<version>2.3.4</version>
</dependency>端口换成 18083,避免和前两章冲突。
application.yml:
yaml
server:
port: 18083
spring:
application:
name: chapter-03-normal-message-async-order
main:
banner-mode: "off"
tutorial:
rocketmq:
endpoints: localhost:8081
order-event:
topic: order-created-topic
tag: OrderCreated
consumer-group: order-side-effects-demo-group
side-effects:
sms:
delay-ms: 300
fail: false
points:
delay-ms: 200
fail: false
warehouse:
delay-ms: 500
fail: falseapplication-rocketmq.yml:
yaml
rocketmq:
producer:
endpoints: ${tutorial.rocketmq.endpoints}
topic: ${tutorial.order-event.topic}
request-timeout: 3
ssl-enabled: false先把 topic 建好
如果你要跑真实 RocketMQ profile,先启动第二章的单机环境:
powershell
cd D:\idea_space\rocketmq-order-tutorial
docker compose -f .\docker\rocketmq-standalone.yml up -d然后创建普通消息 topic:
powershell
docker exec rocketmq-broker sh mqadmin updateTopic `
-n namesrv:9876 `
-c DefaultCluster `
-t order-created-topic `
-a +message.type=NORMALmessage.type=NORMAL 是 5.x 里很容易被旧教程漏掉的点。我们现在讲的是普通消息,topic 类型就应该按普通消息创建。
事件对象:只表达事实
第三章事件对象比第二章多了 payAmount,因为积分服务可能需要金额来计算积分。
java
package com.example.rocketmqdemo.mq;
import java.time.Instant;
public record OrderCreatedEvent(
String eventId,
String orderId,
String userId,
long payAmount,
Instant occurredAt
) {
}这仍然是一个很小的事件。
它没有订单状态、收货地址、优惠明细、库存详情。因为本章的消费者只需要知道:哪笔订单创建了、谁创建的、金额是多少、什么时候发生的。
小技巧:事件名用过去式。
OrderCreated 表示“订单已经创建”。消费者不能阻止这件事,只能响应这件事。这个命名会逼你把“事件”和“命令”分清楚。
订单服务:保存订单后发布事件
第三章的 OrderApplicationService 变成这样:
java
@Service
public class OrderApplicationService {
private final OrderRepository orderRepository;
private final OrderEventPublisher orderEventPublisher;
private final Clock clock;
private final AtomicLong sequence = new AtomicLong(10000);
public OrderCreateResult createOrder(CreateOrderCommand command) {
String orderId = "O" + sequence.incrementAndGet();
Instant createdAt = clock.instant();
Order order = new Order(orderId, command.userId(), command.payAmount(), createdAt);
orderRepository.save(order);
log.info("order saved orderId={}, userId={}, payAmount={}", orderId, command.userId(), command.payAmount());
OrderCreatedEvent event = new OrderCreatedEvent(
"evt_" + UUID.randomUUID(),
orderId,
command.userId(),
command.payAmount(),
createdAt
);
PublishedOrderEvent published = orderEventPublisher.publish(event);
log.info("order event published orderId={}, eventId={}, messageId={}",
orderId, published.eventId(), published.messageId());
return new OrderCreateResult(
orderId,
order.userId(),
order.payAmount(),
published.eventId(),
published.messageId(),
"PUBLISHED"
);
}
}注意,OrderApplicationService 里已经没有:
java
sideEffects.executeAll(order);这就是本章最重要的一刀。
下单服务不再知道“短信、积分、仓库具体怎么处理”。它只发布订单创建事件。以后产品再加站内信、优惠券、履约通知,不应该继续往下单方法里塞代码,而是新增消费者。
为什么还要有接口层
Controller 还是 /api/orders:
java
@RestController
@RequestMapping("/api/orders")
public class OrderController {
private final OrderApplicationService orderApplicationService;
@PostMapping
public OrderCreateHttpResponse createOrder(@RequestBody CreateOrderRequest request) {
long startNanos = System.nanoTime();
OrderCreateResult result = orderApplicationService.createOrder(
new CreateOrderCommand(request.userId(), request.payAmount())
);
long costMs = Duration.ofNanos(System.nanoTime() - startNanos).toMillis();
return new OrderCreateHttpResponse(
"SUCCESS",
result.orderId(),
result.userId(),
result.payAmount(),
result.eventId(),
result.messageId(),
result.downstream(),
costMs
);
}
}响应里多了几个字段:
java
public record OrderCreateHttpResponse(
String status,
String orderId,
String userId,
long payAmount,
String eventId,
String messageId,
String downstream,
long costMs
) {
}downstream=PUBLISHED 表示“下游任务已经通过事件交出去了”,不是“短信、积分、仓库都已经成功”。
这个语义一定要分清。
很多线上事故就是因为接口返回了“下单成功”,前端或运营以为所有旁路动作都成功了。异步以后,主流程成功和旁路成功是两件事。
发送接口先抽象出来
订单服务依赖的是接口:
java
package com.example.rocketmqdemo.order;
import com.example.rocketmqdemo.mq.OrderCreatedEvent;
import com.example.rocketmqdemo.mq.PublishedOrderEvent;
public interface OrderEventPublisher {
PublishedOrderEvent publish(OrderCreatedEvent event);
}为什么这里要抽接口?
不是为了炫技。它有两个实际好处:
- 单元测试可以直接验证订单服务“发了什么事件”,不需要启动 RocketMQ。
- 教程可以提供
local-demo和rocketmq两种运行方式,而业务代码不用改。
这就是工程里有意义的抽象:它隔离外部中间件边界。
真实 RocketMQ 发送实现
rocketmq profile 下使用 RocketMqOrderEventPublisher:
java
@Component
@Profile("rocketmq")
public class RocketMqOrderEventPublisher implements OrderEventPublisher {
private final RocketMQClientTemplate rocketMQClientTemplate;
private final OrderEventProperties properties;
private final OrderEventCodec codec;
@Override
public PublishedOrderEvent publish(OrderCreatedEvent event) {
SendReceipt receipt = rocketMQClientTemplate.syncSendNormalMessage(
properties.destination(),
codec.encode(event)
);
return new PublishedOrderEvent(event.eventId(), event.orderId(), String.valueOf(receipt.getMessageId()));
}
}这里发送的是 JSON 字符串,而不是直接把 Java 对象丢进去。
为什么?
因为我们想让生产者和消费者之间的消息格式更明确。消费者拿到 MessageView 后,可以按 UTF-8 JSON 解码成 OrderCreatedEvent。这比“依赖 starter 默认怎么序列化 Object”更适合教程和排障。
编码器很简单:
java
@Component
public class OrderEventCodec {
private final ObjectMapper objectMapper;
public String encode(OrderCreatedEvent event) {
try {
return objectMapper.writeValueAsString(event);
} catch (JsonProcessingException exception) {
throw new IllegalArgumentException("encode order event failed", exception);
}
}
public OrderCreatedEvent decode(ByteBuffer body) {
ByteBuffer duplicate = body.asReadOnlyBuffer();
byte[] bytes = new byte[duplicate.remaining()];
duplicate.get(bytes);
return decode(bytes);
}
}小技巧:不要用 body.array() 偷懒。ByteBuffer 不一定有可访问的底层数组。复制 remaining bytes 更稳。
真实 RocketMQ 消费实现
消费者监听同一个 topic:
java
@Component
@Profile("rocketmq")
@RocketMQMessageListener(
endpoints = "${tutorial.rocketmq.endpoints}",
topic = "${tutorial.order-event.topic}",
tag = "*",
consumerGroup = "${tutorial.order-event.consumer-group}"
)
public class OrderCreatedListener implements RocketMQListener {
private final OrderCreatedMessageHandler handler;
private final OrderEventCodec codec;
@Override
public ConsumeResult consume(MessageView messageView) {
try {
OrderCreatedEvent event = codec.decode(messageView.getBody());
log.info("consume order event messageId={}, eventId={}, orderId={}",
messageView.getMessageId(), event.eventId(), event.orderId());
handler.handle(event);
return ConsumeResult.SUCCESS;
} catch (RuntimeException exception) {
log.warn("consume order event failed messageId={}, reason={}",
messageView.getMessageId(), exception.getMessage());
return ConsumeResult.FAILURE;
}
}
}这里有一个关键点:消费者失败时返回 ConsumeResult.FAILURE。
这不是为了让接口失败。接口早就返回了。
这是告诉 RocketMQ:这条消息我这次没处理好,你后面可以按消费重试策略再投递。重试、死信和幂等,我们后面会专门讲。现在你只要先知道:消费者失败不再等于下单接口失败。
旁路处理器
真正执行短信、积分、仓库的是:
java
@Component
public class OrderCreatedMessageHandler {
private final OrderSideEffects sideEffects;
public void handle(OrderCreatedEvent event) {
Order order = new Order(event.orderId(), event.userId(), event.payAmount(), event.occurredAt());
log.info("handle order created event eventId={}, orderId={}", event.eventId(), event.orderId());
sideEffects.executeAll(order);
}
}它的位置很重要。
下单服务不直接调用它。消费者调用它。这样主流程和旁路动作之间隔了一层消息。
你以后要把短信拆到通知服务、积分拆到会员服务、仓库拆到履约服务,也是在这个边界继续演进。
先跑测试
执行:
powershell
cd D:\idea_space\rocketmq-order-tutorial
mvn.cmd -pl chapter-03-normal-message-async-order test真实结果:
text
Tests run: 3, Failures: 0, Errors: 0, Skipped: 0
BUILD SUCCESS这 3 个测试分别证明:
- 订单服务保存订单后发布
OrderCreatedEvent,不再内联执行旁路动作。 - 本地异步演示里,HTTP 响应会先返回,延迟 3 秒的短信失败发生在异步线程。
OrderEventCodec可以把事件编码成 UTF-8 JSON,再解码回来。
这个测试组合比只测 Controller 更有价值。它同时覆盖了业务边界、运行效果和消息格式。
先用 local-demo 看效果
因为我当前机器的 Docker Desktop 可以启动,但拉取 apache/rocketmq:5.3.2 时被本机代理阻断,所以这一章先用 local-demo 跑出可观察效果。
启动:
powershell
cd D:\idea_space\rocketmq-order-tutorial
mvn.cmd -pl chapter-03-normal-message-async-order spring-boot:run `
-Dspring-boot.run.profiles=local-demo `
"-Dspring-boot.run.arguments=--tutorial.side-effects.sms.delay-ms=3000 --tutorial.side-effects.sms.fail=true --tutorial.side-effects.points.delay-ms=0 --tutorial.side-effects.warehouse.delay-ms=0"这里故意让短信 3 秒后失败。
然后调用下单接口:
powershell
Invoke-RestMethod `
-Uri 'http://127.0.0.1:18083/api/orders' `
-Method Post `
-ContentType 'application/json; charset=utf-8' `
-Body '{"userId":"U10002","payAmount":29900}'真实响应:
text
status : SUCCESS
orderId : O10002
downstream : PUBLISHED
messageId : LOCAL-10002
serverCostMs : 3
clientCostMs : 148看到没有?
短信明明被设置成 3 秒后失败,但接口 148ms 就返回了。服务端统计的业务耗时只有 3ms。
再看日志:
text
order saved orderId=O10002, userId=U10002, payAmount=29900
order event published orderId=O10002, eventId=evt_016c068f-4127-4258-a7ec-533eff438870, messageId=LOCAL-10002
handle order created event eventId=evt_016c068f-4127-4258-a7ec-533eff438870, orderId=O10002
side-effect start name=sms, orderId=O10002, delayMs=3000, fail=true
side-effect failed name=sms
local async downstream failed orderId=O10002, reason=side effect failed: sms这就是第一章和第三章最直观的区别。
第一章:短信失败,下单接口失败。
第三章:下单接口先成功返回,短信失败留在异步处理链路里。
当然,真实项目里不能只打印失败日志就完事。你还需要重试、死信、告警、幂等和补偿。但这些是后面的章节要解决的问题。本章先把主链路和旁路解开。
再跑真实 RocketMQ profile
当第二章的 RocketMQ 单机环境可用后,切到真实 RocketMQ:
powershell
cd D:\idea_space\rocketmq-order-tutorial
mvn.cmd -pl chapter-03-normal-message-async-order spring-boot:run `
-Dspring-boot.run.profiles=rocketmq调用还是同一个接口:
powershell
Invoke-RestMethod `
-Uri 'http://127.0.0.1:18083/api/orders' `
-Method Post `
-ContentType 'application/json; charset=utf-8' `
-Body '{"userId":"U10003","payAmount":39900}'正常时,你会看到响应里的 messageId 变成 RocketMQ 返回的真实 messageId。控制台也会出现类似:
text
consume order event messageId=..., eventId=..., orderId=O10003
handle order created event eventId=..., orderId=O10003
side-effect success name=sms, orderId=O10003
side-effect success name=points, orderId=O10003
side-effect success name=warehouse, orderId=O10003如果启动时报 Connection refused: localhost:8081,回到第二章的环境排查。这个错误说明应用连不上 Proxy,不是普通消息代码写错。
这章还没有解决什么
这里一定要诚实。
第三章只解决了“旁路动作不堵主链路”。它没有解决所有问题。
它还没有解决:
- 订单保存成功,但消息发送失败怎么办?
- 消费者失败后重试,会不会重复发短信、重复加积分?
- 积分和仓库要不要使用同一个消费者组?
- 消费堆积了怎么发现?
- 消息发出去了,但下游处理很慢,怎么告警?
这些不是本章该硬塞的内容。
如果现在就讲事务消息、死信、幂等、堆积调优,你会觉得“每个概念都听过,但不知道为什么要用”。所以我们按业务问题继续往前走。
下一章先讲消息模型与命名。因为一旦消费者变多,topic、tag、consumerGroup、eventId、key 怎么命名,会直接影响后面排查和扩展。
小技巧
第一,事件用过去式。
OrderCreated 是事件,表示已经发生。CreateOrder 是命令,表示要求别人做事。广播给多个下游时,优先用事件。
第二,消息体不要太大。
消息里放最小上下文。下游需要完整详情时,用 orderId 去查。别把订单表整行复制成消息契约。
第三,异步以后,接口成功不等于下游成功。
接口只能告诉你“订单创建成功,事件已发布”。短信、积分、仓库成功与否,要看消费者处理结果。
第四,先把链路跑通,再谈高级特性。
普通消息能解决“旁路不堵主流程”这个问题。等你真实遇到重复、失败、堆积、一致性问题,再引入重试、死信、幂等、事务消息,学习才会有抓手。
练习题
先自己想。下一章开头我们会复盘这一组题,再进入消息模型和命名。
- 第三章里,
downstream=PUBLISHED为什么不能写成downstream=SUCCESS? - 通知服务、积分服务、仓库服务如果未来拆成三个独立服务,它们应该使用同一个 consumerGroup 吗?为什么?
- 如果短信消费者失败后 RocketMQ 重试,怎么避免重复发短信?
OrderCreatedEvent里为什么要有eventId,只有orderId行不行?- 如果订单保存成功,但普通消息发送失败,本章方案会留下什么风险?你觉得后面应该用什么机制补上?