切换主题
02. 环境与最小闭环:先让 RocketMQ 真正接进工程
上一章我们已经把问题看清楚了:下单成功以后,短信、积分、仓库这些旁路动作不应该继续堵在主链路里。
但是先别急着上来就讲“事务消息”“顺序消息”“死信队列”。这时候你在公司里真正会遇到的情况一般是这样的:
产品还没提新需求,老板也没说要你三天内做完一个分布式架构升级。真正的问题是:你上周说“这里应该上 MQ”,这周技术负责人问你:“那你先跑一条消息给我看看。环境怎么起?Spring Boot 怎么接?发出去以后怎么证明真的到 RocketMQ 了?”
这就是第二章要解决的事。
这一章的目标很朴素:用 rocketmq-v5-client-spring-boot-starter:2.3.4 在 Spring Boot 工程里发出一条“订单已创建”消息,并且把本地 RocketMQ 5.x 单机环境准备好。
上一章练习参考答案
先把上一章留的几个问题复盘掉。这个复盘很重要,因为它决定我们为什么先从普通消息开始,而不是一上来就堆高级特性。
- 把
warehouse.delay-ms从500改成3000,下单接口会发生什么变化?
接口会明显变慢。上一章的同步写法里,仓库预占库存在下单主链路里执行,所以仓库多慢,下单接口就多慢。即使订单本身已经能创建成功,用户也要等仓库调用结束才能拿到响应。
这就是同步链路最直观的问题:旁路动作慢,主流程也跟着慢。
- 如果短信失败,订单到底应不应该回滚?
默认不应该。
前提是短信只是通知,不是订单成立的必要条件。用户付款、订单创建、库存策略这些才是订单主流程。短信失败最多影响“用户有没有及时收到通知”,不应该影响“订单是不是成立”。
当然,如果你的业务假设是“必须短信验证码确认后才能创建订单”,那短信就不是旁路了。它应该放在下单之前,而不是下单之后。
所以判断一个动作能不能异步,先别问技术,先问业务:没有它,主流程还成不成立?
- 如果同一个用户重复提交同一笔业务请求,你准备怎么识别“重复”?
不能只靠“同一个用户 + 同一个金额”判断,这太粗糙。
更稳的方式是让客户端或上游生成一个业务请求号,比如 requestId、clientOrderNo、tradeNo。服务端用这个唯一号做幂等判断:同一个请求号只能成功创建一次订单。
以后我们讲“消费幂等”和“重试”时,还会继续用这个思路。MQ 不是用来替你消灭重复的,MQ 是让你必须认真面对重复。
- 下单后发优惠券,应该放在主链路还是旁路?
多数情况下放旁路。
因为优惠券发晚一点,用户还能下单;优惠券发失败,可以补偿;但因为优惠券服务挂了导致全站下不了单,这个代价通常不能接受。
不过也有例外。如果这是“买一赠一券包”,并且券包是交易承诺的一部分,那它可能就不是普通旁路。业务边界要先定义清楚。
- 现在还没有 MQ,能不能先用本地线程池异步处理?这样会有什么风险?
可以临时用,但不要把它当成最终方案。
本地线程池只能解决“别阻塞当前请求线程”这个小问题,解决不了服务重启、进程崩溃、机器宕机、任务堆积、跨服务消费、失败重试、消费进度可观测这些问题。
比如订单创建成功后,你把发短信任务丢进本地线程池。刚丢进去,应用重启了,这个任务可能就没了。MQ 的价值不只是“异步”,更是把任务变成一个可以被持久化、被消费、被重试、被观测的消息。
这一章先不改下单主流程
上一章我们已经知道要把旁路动作拆出去。但这一章先不急着重构下单接口。
为什么?
因为一个工程里引入中间件,第一步不是把业务代码改得满天飞。第一步是先把最小闭环跑通:
- RocketMQ 服务端能不能起来?
- Spring Boot 能不能创建 RocketMQ producer?
- topic 和 tag 怎么配置?
- 发送一条消息时,代码到底调用哪个 API?
- 消费者启动时,能不能连上 Proxy?
- 如果连不上,错误长什么样?
这些都跑通以后,下一章再把第一章的下单流程真正改成“订单创建成功 -> 发送消息 -> 旁路服务消费消息”。
本章伴生工程
本章代码在伴生工程的第二个模块:
text
D:\idea_space\rocketmq-order-tutorial
pom.xml
docker/
rocketmq-standalone.yml
chapter-02-rocketmq-first-message/
pom.xml
src/main/java/com/example/rocketmqdemo/
TutorialRocketMqApplication.java
api/
OrderMessageController.java
PublishOrderMessageRequest.java
PublishOrderMessageResponse.java
config/
RocketMqDemoConfiguration.java
mq/
MessageSendGateway.java
MessageSendResult.java
OrderCreatedEvent.java
OrderCreatedListener.java
OrderEventProperties.java
OrderEventPublisher.java
PublishedOrderEvent.java
RocketMqMessageSendGateway.java
src/main/resources/
application.yml
src/test/java/com/example/rocketmqdemo/mq/
OrderEventPublisherTest.java
RecordingMessageSendGateway.java你先记住这个结构:api 负责 HTTP 入口,mq 负责消息发送和消费,config 放工程配置。后面章节会继续沿着这个工程往前长,不是每章散落一堆孤立代码片段。
先把 RocketMQ 单机环境准备好
RocketMQ 5.x 本地学习最容易跑偏的点,是把 4.x 的 NameServer 写法直接套过来。
RocketMQ 5.x Java SDK 走的是 endpoint。我们本地单机环境里,应用连接的是 Proxy 暴露出来的地址,默认按 localhost:8081 来配置。
本章用 Docker Compose 起一个最小单机环境:NameServer、Broker、Proxy。
docker/rocketmq-standalone.yml:
yaml
services:
namesrv:
image: apache/rocketmq:5.3.2
container_name: rocketmq-namesrv
command: sh mqnamesrv
ports:
- "9876:9876"
broker:
image: apache/rocketmq:5.3.2
container_name: rocketmq-broker
depends_on:
- namesrv
command: sh mqbroker -n namesrv:9876
ports:
- "10909:10909"
- "10911:10911"
proxy:
image: apache/rocketmq:5.3.2
container_name: rocketmq-proxy
depends_on:
- broker
- namesrv
command: sh mqproxy -n namesrv:9876
ports:
- "8080:8080"
- "8081:8081"先看 Docker 是否可用:
powershell
docker version
docker compose version正常情况下,你应该至少能看到 Docker Client、Docker Server 和 Compose 版本。
我这里真实跑到的 Docker 版本是:
text
Client:
Version: 29.4.2
Context: desktop-linux
Server: Docker Desktop 4.72.0 (225998)
Engine:
Version: 29.4.2
OS/Arch: linux/amd64
Docker Compose version v5.1.3然后启动 RocketMQ:
powershell
cd D:\idea_space\rocketmq-order-tutorial
docker compose -f .\docker\rocketmq-standalone.yml up -d如果你的 Docker 可以正常访问 Docker Hub,它会拉取 apache/rocketmq:5.3.2,然后启动三个容器。启动后检查:
powershell
docker ps --filter "name=rocketmq"你应该看到类似这样的结果:
text
rocketmq-namesrv Up
rocketmq-broker Up
rocketmq-proxy Up如果你像我这次一样,Docker Desktop 配了一个当前不可用的代理,就会卡在拉镜像这一步。真实错误长这样:
text
failed to resolve reference "docker.io/apache/rocketmq:5.3.2"
connecting to host.docker.internal:7897
A connection attempt failed because the connected party did not properly respond这个错误说明什么?
它说明 Docker daemon 已经起来了,Compose 文件也能解析,但 Docker Hub 镜像拉取被本机代理阻断了。这个时候不要去改 Java 代码,也不要怀疑 topic 配置。先把 Docker Desktop 的代理关掉,或者把代理端口恢复正常,再重新执行 docker compose up -d。
这就是工程思维:你要先判断问题在哪一层。CLI 不可用、daemon 没启动、镜像拉不下来、容器没启动、应用连不上 Proxy,是五个不同问题。
创建 topic
RocketMQ 容器启动后,创建本章使用的 topic:
powershell
docker exec rocketmq-broker sh mqadmin updateTopic `
-n namesrv:9876 `
-c DefaultCluster `
-t order-created-topic `
-a +message.type=NORMAL本章只用一个 topic:order-created-topic。
message.type=NORMAL 表示这个 topic 用来发送普通消息。后面我们讲延时消息、顺序消息、事务消息时,会分别解释为什么 topic 类型不能随便混用。
tag 用 OrderCreated。也就是说,这条消息的发送目标不是只写 topic,而是:
text
order-created-topic:OrderCreated这个写法后面会非常重要。topic 是大类,tag 是这个 topic 下的业务事件类型。你可以先把它理解成:
text
订单事件这个信箱里,有一封类型叫 OrderCreated 的信Spring Boot 依赖
第二章模块的 pom.xml 里引入 starter:
xml
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-v5-client-spring-boot-starter</artifactId>
<version>2.3.4</version>
</dependency>本章用 Spring Boot 2.7.18、JDK 17。父工程里统一设置 UTF-8:
xml
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>这不是形式主义。你后面会发 JSON、打日志、看中文报错,编码混乱会让教程和排障体验都很差。
Spring Boot 配置
application.yml:
yaml
server:
port: 18082
spring:
application:
name: chapter-02-rocketmq-first-message
main:
banner-mode: "off"
tutorial:
rocketmq:
endpoints: localhost:8081
order-event:
topic: order-created-topic
tag: OrderCreated
consumer-group: order-created-demo-group
rocketmq:
producer:
endpoints: ${tutorial.rocketmq.endpoints}
topic: ${tutorial.order-event.topic}
request-timeout: 3
ssl-enabled: false这里分成两层配置:
第一层是我们自己的业务配置:
yaml
tutorial:
rocketmq:
endpoints: localhost:8081
order-event:
topic: order-created-topic
tag: OrderCreated
consumer-group: order-created-demo-group它表达的是业务语言:订单事件发到哪个 topic,用哪个 tag,消费者组叫什么。
第二层是 RocketMQ starter 真正要读的 producer 配置:
yaml
rocketmq:
producer:
endpoints: ${tutorial.rocketmq.endpoints}
topic: ${tutorial.order-event.topic}
request-timeout: 3
ssl-enabled: falsessl-enabled: false 在本地很关键。本地 Proxy 是明文连接,不关掉 SSL,你很容易遇到连不上但又看不懂的握手问题。
事件对象
订单创建以后,我们先不把整个订单对象扔进 MQ。消息越大,耦合越重。
本章只发最小事件:
java
package com.example.rocketmqdemo.mq;
import java.time.Instant;
public record OrderCreatedEvent(
String eventId,
String orderId,
String userId,
Instant occurredAt
) {
}eventId 是事件自己的唯一编号。它不是 orderId。
为什么要单独有 eventId?
因为一笔订单后面可能产生多种事件:订单创建、订单支付、订单取消、订单发货。orderId 表示业务对象,eventId 表示这一次发生的事实。
这也是一个小技巧:别把消息设计成“我把数据库表整行复制过去”。消息应该表达“发生了什么事”。
配置对象
把 topic 和 tag 收进配置对象:
java
package com.example.rocketmqdemo.mq;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties(prefix = "tutorial.order-event")
public record OrderEventProperties(
String topic,
String tag
) {
public String destination() {
return topic + ":" + tag;
}
}这里的 destination() 很小,但它能避免你在业务代码里到处拼 topic + ":" + tag。
以后如果我们要统一规范 tag 命名,也只需要收口到这里。
发送网关
我没有让业务类直接依赖 RocketMQClientTemplate,而是先包一层 MessageSendGateway:
java
package com.example.rocketmqdemo.mq;
public interface MessageSendGateway {
MessageSendResult sendNormal(String destination, Object payload);
}返回结果也收成自己的对象:
java
package com.example.rocketmqdemo.mq;
public record MessageSendResult(String messageId) {
}RocketMQ 的实现类长这样:
java
package com.example.rocketmqdemo.mq;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.client.core.RocketMQClientTemplate;
import org.springframework.stereotype.Component;
@Component
public class RocketMqMessageSendGateway implements MessageSendGateway {
private final RocketMQClientTemplate rocketMQClientTemplate;
public RocketMqMessageSendGateway(RocketMQClientTemplate rocketMQClientTemplate) {
this.rocketMQClientTemplate = rocketMQClientTemplate;
}
@Override
public MessageSendResult sendNormal(String destination, Object payload) {
SendReceipt receipt = rocketMQClientTemplate.syncSendNormalMessage(destination, payload);
return new MessageSendResult(String.valueOf(receipt.getMessageId()));
}
}为什么要多这一层?
因为第二章我们要先测试“业务代码到底会发到哪个 destination”。如果业务类直接绑死 RocketMQClientTemplate,单测就会变得很重。包一层之后,测试里可以用一个记录型 fake gateway,看清楚它到底发了什么。
这不是过度设计。它只隔离了一个外部中间件边界。
发布订单事件
OrderEventPublisher 负责把业务入参变成事件,再交给发送网关:
java
package com.example.rocketmqdemo.mq;
import org.springframework.stereotype.Service;
import java.time.Clock;
import java.util.UUID;
@Service
public class OrderEventPublisher {
private final MessageSendGateway messageSendGateway;
private final OrderEventProperties properties;
private final Clock clock;
public OrderEventPublisher(
MessageSendGateway messageSendGateway,
OrderEventProperties properties,
Clock clock
) {
this.messageSendGateway = messageSendGateway;
this.properties = properties;
this.clock = clock;
}
public PublishedOrderEvent publishOrderCreated(String orderId, String userId) {
OrderCreatedEvent event = new OrderCreatedEvent(
"evt_" + UUID.randomUUID(),
orderId,
userId,
clock.instant()
);
MessageSendResult result = messageSendGateway.sendNormal(properties.destination(), event);
return new PublishedOrderEvent(event.eventId(), event.orderId(), result.messageId());
}
}返回给 Controller 的对象:
java
package com.example.rocketmqdemo.mq;
public record PublishedOrderEvent(
String eventId,
String orderId,
String messageId
) {
}注意这里有一个边界:OrderEventPublisher 不关心短信、不关心积分、不关心仓库。它只负责发布“订单已创建”这个事实。
下一章我们才会把第一章的下单主流程接到这里。
HTTP 入口
为了让读者能用真实 HTTP 请求看到效果,本章加一个很薄的 Controller:
java
package com.example.rocketmqdemo.api;
public record PublishOrderMessageRequest(
String orderId,
String userId
) {
}java
package com.example.rocketmqdemo.api;
public record PublishOrderMessageResponse(
String status,
String eventId,
String orderId,
String messageId
) {
}java
package com.example.rocketmqdemo.api;
import com.example.rocketmqdemo.mq.OrderEventPublisher;
import com.example.rocketmqdemo.mq.PublishedOrderEvent;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/api/order-events")
public class OrderMessageController {
private final OrderEventPublisher orderEventPublisher;
public OrderMessageController(OrderEventPublisher orderEventPublisher) {
this.orderEventPublisher = orderEventPublisher;
}
@PostMapping
public PublishOrderMessageResponse publishOrderCreated(@RequestBody PublishOrderMessageRequest request) {
PublishedOrderEvent event = orderEventPublisher.publishOrderCreated(request.orderId(), request.userId());
return new PublishOrderMessageResponse("SENT", event.eventId(), event.orderId(), event.messageId());
}
}这个接口不是最终业务接口。它只是本章的最小验证入口。
真正的业务改造会在下一章发生:用户请求下单接口,下单成功以后,订单服务发消息。
最小消费者
本章消费者先只确认“消息能被消费到”,不做复杂业务处理。
java
package com.example.rocketmqdemo.mq;
import org.apache.rocketmq.client.annotation.RocketMQMessageListener;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.client.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(
endpoints = "${tutorial.rocketmq.endpoints}",
topic = "${tutorial.order-event.topic}",
tag = "*",
consumerGroup = "${tutorial.order-event.consumer-group}"
)
public class OrderCreatedListener implements RocketMQListener {
private static final Logger log = LoggerFactory.getLogger(OrderCreatedListener.class);
@Override
public ConsumeResult consume(MessageView messageView) {
log.info("consume order event messageId={}", messageView.getMessageId());
return ConsumeResult.SUCCESS;
}
}第一次跑通消费时,不要急着做复杂反序列化。
你先把 messageId 打出来,确认消费者真的启动了、真的收到消息了、真的返回 ConsumeResult.SUCCESS。链路通了,再慢慢加业务字段解析。
这是排障时很省命的小技巧:先证明链路,再证明业务。
先跑单元测试
启动 RocketMQ 之前,先验证我们的业务发送逻辑。
执行:
powershell
cd D:\idea_space\rocketmq-order-tutorial
mvn.cmd -pl chapter-02-rocketmq-first-message test真实运行结果:
text
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0
BUILD SUCCESS这个测试证明了三件事:
OrderEventPublisher会生成OrderCreatedEvent。- 发送目标是
order-created-topic:OrderCreated。 - 发送结果里的
messageId会返回给调用方。
测试代码里用了一个记录型 gateway:
java
class RecordingMessageSendGateway implements MessageSendGateway {
private final String messageId;
private String destination;
private Object payload;
RecordingMessageSendGateway(String messageId) {
this.messageId = messageId;
}
@Override
public MessageSendResult sendNormal(String destination, Object payload) {
this.destination = destination;
this.payload = payload;
return new MessageSendResult(messageId);
}
String destination() {
return destination;
}
Object payload() {
return payload;
}
}这个测试不需要 RocketMQ 服务端。它先保证“我们准备发什么”是对的。
等 RocketMQ 容器启动以后,再验证“消息能不能真的发出去、被消费到”。
启动应用
RocketMQ 容器和 topic 准备好以后,启动第二章应用:
powershell
cd D:\idea_space\rocketmq-order-tutorial
mvn.cmd -pl chapter-02-rocketmq-first-message spring-boot:run如果 RocketMQ Proxy 没有启动,应用会失败,而且失败得很明确。
我这里在 Proxy 不存在时真实看到的日志是:
text
Init Producer Args: Producer{endpoints='localhost:8081', topic='order-created-topic', requestTimeout=3, sslEnabled=false}
a producer init on proxy localhost:8081
Register the listener to container, listenerBeanName:orderCreatedListener
java.lang.IllegalStateException: Failed to start RocketMQ push consumer
Connection refused: getsockopt: localhost/[0:0:0:0:0:0:0:1]:8081这段日志很有价值。
它说明 starter 已经生效了,producer 配置也被读取了,listener 也注册了。失败点不是 Spring Boot 没扫到 Bean,而是消费者启动时连不上 localhost:8081 的 RocketMQ Proxy。
所以看到这个错误时,不要先改 Java。先检查:
powershell
docker ps --filter "name=rocketmq-proxy"如果 proxy 容器没起来,先把 RocketMQ 环境修好。
发送一条订单创建消息
当 RocketMQ 容器都启动,并且应用启动成功后,执行:
powershell
Invoke-RestMethod `
-Uri 'http://127.0.0.1:18082/api/order-events' `
-Method Post `
-ContentType 'application/json; charset=utf-8' `
-Body '{"orderId":"O10001","userId":"U10001"}'你应该看到类似响应:
text
status eventId orderId messageId
------ ------- ------- ---------
SENT evt_8d5f0f5c-4b1c-4c58-b2c3-... O10001 01F...与此同时,应用控制台应该看到消费者日志:
text
consume order event messageId=01F...看到这两个结果,就说明本章最小闭环成立:
- HTTP 请求进入 Spring Boot。
- Controller 调用
OrderEventPublisher。 OrderEventPublisher生成OrderCreatedEvent。RocketMQClientTemplate把消息发到order-created-topic:OrderCreated。- PushConsumer 收到消息并返回消费成功。
如果你只看到 HTTP 响应,没有看到消费日志,先检查消费者组、topic、tag 和 Proxy。不要一上来怀疑消息体。
这一章真正学到什么
这一章不是为了显摆“我会发一条 MQ 消息”。
它真正训练的是工程接入中间件的顺序:
先准备服务端环境,再确认客户端配置;先发普通消息,再讲高级特性;先用最小事件跑通链路,再把它接回真实业务。
你现在已经有了三样东西:
- RocketMQ 5.x 本地单机环境的启动方式。
- Spring Boot starter 2.3.4 的最小 producer、consumer 配置。
- 一个能被测试、能被 HTTP 调用、能继续演进的伴生工程模块。
下一章,我们就回到第一章那个慢吞吞的下单接口。
产品又会来一句很熟悉的话:“下单成功以后,短信、积分、仓库都要继续做,但你别再让用户等了。”
这时候我们就把第一章的同步旁路动作,正式改成普通消息异步解耦。
练习题
先自己想。下一章开头我们会复盘这一组题,再进入普通消息改造。
- 为什么本章先把消息设计成
OrderCreatedEvent,而不是直接把订单表所有字段都发出去? rocketmq.producer.topic和OrderEventProperties.destination()里的 topic 是什么关系?它们不一致时可能会发生什么?- 如果应用启动时报
Connection refused: localhost:8081,你会按什么顺序排查? - 为什么本地环境要写
ssl-enabled: false? - 如果 HTTP 接口返回
SENT,但是消费者没有日志,你准备先看 topic/tag,还是先改消息体?为什么?