Skip to content

02. 环境与最小闭环:先让 RocketMQ 真正接进工程

上一章我们已经把问题看清楚了:下单成功以后,短信、积分、仓库这些旁路动作不应该继续堵在主链路里。

但是先别急着上来就讲“事务消息”“顺序消息”“死信队列”。这时候你在公司里真正会遇到的情况一般是这样的:

产品还没提新需求,老板也没说要你三天内做完一个分布式架构升级。真正的问题是:你上周说“这里应该上 MQ”,这周技术负责人问你:“那你先跑一条消息给我看看。环境怎么起?Spring Boot 怎么接?发出去以后怎么证明真的到 RocketMQ 了?”

这就是第二章要解决的事。

这一章的目标很朴素:用 rocketmq-v5-client-spring-boot-starter:2.3.4 在 Spring Boot 工程里发出一条“订单已创建”消息,并且把本地 RocketMQ 5.x 单机环境准备好。

上一章练习参考答案

先把上一章留的几个问题复盘掉。这个复盘很重要,因为它决定我们为什么先从普通消息开始,而不是一上来就堆高级特性。

  1. warehouse.delay-ms500 改成 3000,下单接口会发生什么变化?

接口会明显变慢。上一章的同步写法里,仓库预占库存在下单主链路里执行,所以仓库多慢,下单接口就多慢。即使订单本身已经能创建成功,用户也要等仓库调用结束才能拿到响应。

这就是同步链路最直观的问题:旁路动作慢,主流程也跟着慢。

  1. 如果短信失败,订单到底应不应该回滚?

默认不应该。

前提是短信只是通知,不是订单成立的必要条件。用户付款、订单创建、库存策略这些才是订单主流程。短信失败最多影响“用户有没有及时收到通知”,不应该影响“订单是不是成立”。

当然,如果你的业务假设是“必须短信验证码确认后才能创建订单”,那短信就不是旁路了。它应该放在下单之前,而不是下单之后。

所以判断一个动作能不能异步,先别问技术,先问业务:没有它,主流程还成不成立?

  1. 如果同一个用户重复提交同一笔业务请求,你准备怎么识别“重复”?

不能只靠“同一个用户 + 同一个金额”判断,这太粗糙。

更稳的方式是让客户端或上游生成一个业务请求号,比如 requestIdclientOrderNotradeNo。服务端用这个唯一号做幂等判断:同一个请求号只能成功创建一次订单。

以后我们讲“消费幂等”和“重试”时,还会继续用这个思路。MQ 不是用来替你消灭重复的,MQ 是让你必须认真面对重复。

  1. 下单后发优惠券,应该放在主链路还是旁路?

多数情况下放旁路。

因为优惠券发晚一点,用户还能下单;优惠券发失败,可以补偿;但因为优惠券服务挂了导致全站下不了单,这个代价通常不能接受。

不过也有例外。如果这是“买一赠一券包”,并且券包是交易承诺的一部分,那它可能就不是普通旁路。业务边界要先定义清楚。

  1. 现在还没有 MQ,能不能先用本地线程池异步处理?这样会有什么风险?

可以临时用,但不要把它当成最终方案。

本地线程池只能解决“别阻塞当前请求线程”这个小问题,解决不了服务重启、进程崩溃、机器宕机、任务堆积、跨服务消费、失败重试、消费进度可观测这些问题。

比如订单创建成功后,你把发短信任务丢进本地线程池。刚丢进去,应用重启了,这个任务可能就没了。MQ 的价值不只是“异步”,更是把任务变成一个可以被持久化、被消费、被重试、被观测的消息。

这一章先不改下单主流程

上一章我们已经知道要把旁路动作拆出去。但这一章先不急着重构下单接口。

为什么?

因为一个工程里引入中间件,第一步不是把业务代码改得满天飞。第一步是先把最小闭环跑通:

  • RocketMQ 服务端能不能起来?
  • Spring Boot 能不能创建 RocketMQ producer?
  • topic 和 tag 怎么配置?
  • 发送一条消息时,代码到底调用哪个 API?
  • 消费者启动时,能不能连上 Proxy?
  • 如果连不上,错误长什么样?

这些都跑通以后,下一章再把第一章的下单流程真正改成“订单创建成功 -> 发送消息 -> 旁路服务消费消息”。

本章伴生工程

本章代码在伴生工程的第二个模块:

text
D:\idea_space\rocketmq-order-tutorial
  pom.xml
  docker/
    rocketmq-standalone.yml
  chapter-02-rocketmq-first-message/
    pom.xml
    src/main/java/com/example/rocketmqdemo/
      TutorialRocketMqApplication.java
      api/
        OrderMessageController.java
        PublishOrderMessageRequest.java
        PublishOrderMessageResponse.java
      config/
        RocketMqDemoConfiguration.java
      mq/
        MessageSendGateway.java
        MessageSendResult.java
        OrderCreatedEvent.java
        OrderCreatedListener.java
        OrderEventProperties.java
        OrderEventPublisher.java
        PublishedOrderEvent.java
        RocketMqMessageSendGateway.java
    src/main/resources/
      application.yml
    src/test/java/com/example/rocketmqdemo/mq/
      OrderEventPublisherTest.java
      RecordingMessageSendGateway.java

你先记住这个结构:api 负责 HTTP 入口,mq 负责消息发送和消费,config 放工程配置。后面章节会继续沿着这个工程往前长,不是每章散落一堆孤立代码片段。

先把 RocketMQ 单机环境准备好

RocketMQ 5.x 本地学习最容易跑偏的点,是把 4.x 的 NameServer 写法直接套过来。

RocketMQ 5.x Java SDK 走的是 endpoint。我们本地单机环境里,应用连接的是 Proxy 暴露出来的地址,默认按 localhost:8081 来配置。

本章用 Docker Compose 起一个最小单机环境:NameServer、Broker、Proxy。

docker/rocketmq-standalone.yml

yaml
services:
  namesrv:
    image: apache/rocketmq:5.3.2
    container_name: rocketmq-namesrv
    command: sh mqnamesrv
    ports:
      - "9876:9876"

  broker:
    image: apache/rocketmq:5.3.2
    container_name: rocketmq-broker
    depends_on:
      - namesrv
    command: sh mqbroker -n namesrv:9876
    ports:
      - "10909:10909"
      - "10911:10911"

  proxy:
    image: apache/rocketmq:5.3.2
    container_name: rocketmq-proxy
    depends_on:
      - broker
      - namesrv
    command: sh mqproxy -n namesrv:9876
    ports:
      - "8080:8080"
      - "8081:8081"

先看 Docker 是否可用:

powershell
docker version
docker compose version

正常情况下,你应该至少能看到 Docker Client、Docker Server 和 Compose 版本。

我这里真实跑到的 Docker 版本是:

text
Client:
 Version:           29.4.2
 Context:           desktop-linux

Server: Docker Desktop 4.72.0 (225998)
 Engine:
  Version:          29.4.2
  OS/Arch:          linux/amd64

Docker Compose version v5.1.3

然后启动 RocketMQ:

powershell
cd D:\idea_space\rocketmq-order-tutorial
docker compose -f .\docker\rocketmq-standalone.yml up -d

如果你的 Docker 可以正常访问 Docker Hub,它会拉取 apache/rocketmq:5.3.2,然后启动三个容器。启动后检查:

powershell
docker ps --filter "name=rocketmq"

你应该看到类似这样的结果:

text
rocketmq-namesrv   Up
rocketmq-broker    Up
rocketmq-proxy     Up

如果你像我这次一样,Docker Desktop 配了一个当前不可用的代理,就会卡在拉镜像这一步。真实错误长这样:

text
failed to resolve reference "docker.io/apache/rocketmq:5.3.2"
connecting to host.docker.internal:7897
A connection attempt failed because the connected party did not properly respond

这个错误说明什么?

它说明 Docker daemon 已经起来了,Compose 文件也能解析,但 Docker Hub 镜像拉取被本机代理阻断了。这个时候不要去改 Java 代码,也不要怀疑 topic 配置。先把 Docker Desktop 的代理关掉,或者把代理端口恢复正常,再重新执行 docker compose up -d

这就是工程思维:你要先判断问题在哪一层。CLI 不可用、daemon 没启动、镜像拉不下来、容器没启动、应用连不上 Proxy,是五个不同问题。

创建 topic

RocketMQ 容器启动后,创建本章使用的 topic:

powershell
docker exec rocketmq-broker sh mqadmin updateTopic `
  -n namesrv:9876 `
  -c DefaultCluster `
  -t order-created-topic `
  -a +message.type=NORMAL

本章只用一个 topic:order-created-topic

message.type=NORMAL 表示这个 topic 用来发送普通消息。后面我们讲延时消息、顺序消息、事务消息时,会分别解释为什么 topic 类型不能随便混用。

tag 用 OrderCreated。也就是说,这条消息的发送目标不是只写 topic,而是:

text
order-created-topic:OrderCreated

这个写法后面会非常重要。topic 是大类,tag 是这个 topic 下的业务事件类型。你可以先把它理解成:

text
订单事件这个信箱里,有一封类型叫 OrderCreated 的信

Spring Boot 依赖

第二章模块的 pom.xml 里引入 starter:

xml
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-v5-client-spring-boot-starter</artifactId>
    <version>2.3.4</version>
</dependency>

本章用 Spring Boot 2.7.18、JDK 17。父工程里统一设置 UTF-8:

xml
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

这不是形式主义。你后面会发 JSON、打日志、看中文报错,编码混乱会让教程和排障体验都很差。

Spring Boot 配置

application.yml

yaml
server:
  port: 18082

spring:
  application:
    name: chapter-02-rocketmq-first-message
  main:
    banner-mode: "off"

tutorial:
  rocketmq:
    endpoints: localhost:8081
  order-event:
    topic: order-created-topic
    tag: OrderCreated
    consumer-group: order-created-demo-group

rocketmq:
  producer:
    endpoints: ${tutorial.rocketmq.endpoints}
    topic: ${tutorial.order-event.topic}
    request-timeout: 3
    ssl-enabled: false

这里分成两层配置:

第一层是我们自己的业务配置:

yaml
tutorial:
  rocketmq:
    endpoints: localhost:8081
  order-event:
    topic: order-created-topic
    tag: OrderCreated
    consumer-group: order-created-demo-group

它表达的是业务语言:订单事件发到哪个 topic,用哪个 tag,消费者组叫什么。

第二层是 RocketMQ starter 真正要读的 producer 配置:

yaml
rocketmq:
  producer:
    endpoints: ${tutorial.rocketmq.endpoints}
    topic: ${tutorial.order-event.topic}
    request-timeout: 3
    ssl-enabled: false

ssl-enabled: false 在本地很关键。本地 Proxy 是明文连接,不关掉 SSL,你很容易遇到连不上但又看不懂的握手问题。

事件对象

订单创建以后,我们先不把整个订单对象扔进 MQ。消息越大,耦合越重。

本章只发最小事件:

java
package com.example.rocketmqdemo.mq;

import java.time.Instant;

public record OrderCreatedEvent(
        String eventId,
        String orderId,
        String userId,
        Instant occurredAt
) {
}

eventId 是事件自己的唯一编号。它不是 orderId

为什么要单独有 eventId

因为一笔订单后面可能产生多种事件:订单创建、订单支付、订单取消、订单发货。orderId 表示业务对象,eventId 表示这一次发生的事实。

这也是一个小技巧:别把消息设计成“我把数据库表整行复制过去”。消息应该表达“发生了什么事”。

配置对象

把 topic 和 tag 收进配置对象:

java
package com.example.rocketmqdemo.mq;

import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties(prefix = "tutorial.order-event")
public record OrderEventProperties(
        String topic,
        String tag
) {

    public String destination() {
        return topic + ":" + tag;
    }
}

这里的 destination() 很小,但它能避免你在业务代码里到处拼 topic + ":" + tag

以后如果我们要统一规范 tag 命名,也只需要收口到这里。

发送网关

我没有让业务类直接依赖 RocketMQClientTemplate,而是先包一层 MessageSendGateway

java
package com.example.rocketmqdemo.mq;

public interface MessageSendGateway {

    MessageSendResult sendNormal(String destination, Object payload);
}

返回结果也收成自己的对象:

java
package com.example.rocketmqdemo.mq;

public record MessageSendResult(String messageId) {
}

RocketMQ 的实现类长这样:

java
package com.example.rocketmqdemo.mq;

import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.client.core.RocketMQClientTemplate;
import org.springframework.stereotype.Component;

@Component
public class RocketMqMessageSendGateway implements MessageSendGateway {

    private final RocketMQClientTemplate rocketMQClientTemplate;

    public RocketMqMessageSendGateway(RocketMQClientTemplate rocketMQClientTemplate) {
        this.rocketMQClientTemplate = rocketMQClientTemplate;
    }

    @Override
    public MessageSendResult sendNormal(String destination, Object payload) {
        SendReceipt receipt = rocketMQClientTemplate.syncSendNormalMessage(destination, payload);
        return new MessageSendResult(String.valueOf(receipt.getMessageId()));
    }
}

为什么要多这一层?

因为第二章我们要先测试“业务代码到底会发到哪个 destination”。如果业务类直接绑死 RocketMQClientTemplate,单测就会变得很重。包一层之后,测试里可以用一个记录型 fake gateway,看清楚它到底发了什么。

这不是过度设计。它只隔离了一个外部中间件边界。

发布订单事件

OrderEventPublisher 负责把业务入参变成事件,再交给发送网关:

java
package com.example.rocketmqdemo.mq;

import org.springframework.stereotype.Service;

import java.time.Clock;
import java.util.UUID;

@Service
public class OrderEventPublisher {

    private final MessageSendGateway messageSendGateway;
    private final OrderEventProperties properties;
    private final Clock clock;

    public OrderEventPublisher(
            MessageSendGateway messageSendGateway,
            OrderEventProperties properties,
            Clock clock
    ) {
        this.messageSendGateway = messageSendGateway;
        this.properties = properties;
        this.clock = clock;
    }

    public PublishedOrderEvent publishOrderCreated(String orderId, String userId) {
        OrderCreatedEvent event = new OrderCreatedEvent(
                "evt_" + UUID.randomUUID(),
                orderId,
                userId,
                clock.instant()
        );
        MessageSendResult result = messageSendGateway.sendNormal(properties.destination(), event);
        return new PublishedOrderEvent(event.eventId(), event.orderId(), result.messageId());
    }
}

返回给 Controller 的对象:

java
package com.example.rocketmqdemo.mq;

public record PublishedOrderEvent(
        String eventId,
        String orderId,
        String messageId
) {
}

注意这里有一个边界:OrderEventPublisher 不关心短信、不关心积分、不关心仓库。它只负责发布“订单已创建”这个事实。

下一章我们才会把第一章的下单主流程接到这里。

HTTP 入口

为了让读者能用真实 HTTP 请求看到效果,本章加一个很薄的 Controller:

java
package com.example.rocketmqdemo.api;

public record PublishOrderMessageRequest(
        String orderId,
        String userId
) {
}
java
package com.example.rocketmqdemo.api;

public record PublishOrderMessageResponse(
        String status,
        String eventId,
        String orderId,
        String messageId
) {
}
java
package com.example.rocketmqdemo.api;

import com.example.rocketmqdemo.mq.OrderEventPublisher;
import com.example.rocketmqdemo.mq.PublishedOrderEvent;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/api/order-events")
public class OrderMessageController {

    private final OrderEventPublisher orderEventPublisher;

    public OrderMessageController(OrderEventPublisher orderEventPublisher) {
        this.orderEventPublisher = orderEventPublisher;
    }

    @PostMapping
    public PublishOrderMessageResponse publishOrderCreated(@RequestBody PublishOrderMessageRequest request) {
        PublishedOrderEvent event = orderEventPublisher.publishOrderCreated(request.orderId(), request.userId());
        return new PublishOrderMessageResponse("SENT", event.eventId(), event.orderId(), event.messageId());
    }
}

这个接口不是最终业务接口。它只是本章的最小验证入口。

真正的业务改造会在下一章发生:用户请求下单接口,下单成功以后,订单服务发消息。

最小消费者

本章消费者先只确认“消息能被消费到”,不做复杂业务处理。

java
package com.example.rocketmqdemo.mq;

import org.apache.rocketmq.client.annotation.RocketMQMessageListener;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.client.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(
        endpoints = "${tutorial.rocketmq.endpoints}",
        topic = "${tutorial.order-event.topic}",
        tag = "*",
        consumerGroup = "${tutorial.order-event.consumer-group}"
)
public class OrderCreatedListener implements RocketMQListener {

    private static final Logger log = LoggerFactory.getLogger(OrderCreatedListener.class);

    @Override
    public ConsumeResult consume(MessageView messageView) {
        log.info("consume order event messageId={}", messageView.getMessageId());
        return ConsumeResult.SUCCESS;
    }
}

第一次跑通消费时,不要急着做复杂反序列化。

你先把 messageId 打出来,确认消费者真的启动了、真的收到消息了、真的返回 ConsumeResult.SUCCESS。链路通了,再慢慢加业务字段解析。

这是排障时很省命的小技巧:先证明链路,再证明业务。

先跑单元测试

启动 RocketMQ 之前,先验证我们的业务发送逻辑。

执行:

powershell
cd D:\idea_space\rocketmq-order-tutorial
mvn.cmd -pl chapter-02-rocketmq-first-message test

真实运行结果:

text
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0
BUILD SUCCESS

这个测试证明了三件事:

  1. OrderEventPublisher 会生成 OrderCreatedEvent
  2. 发送目标是 order-created-topic:OrderCreated
  3. 发送结果里的 messageId 会返回给调用方。

测试代码里用了一个记录型 gateway:

java
class RecordingMessageSendGateway implements MessageSendGateway {

    private final String messageId;
    private String destination;
    private Object payload;

    RecordingMessageSendGateway(String messageId) {
        this.messageId = messageId;
    }

    @Override
    public MessageSendResult sendNormal(String destination, Object payload) {
        this.destination = destination;
        this.payload = payload;
        return new MessageSendResult(messageId);
    }

    String destination() {
        return destination;
    }

    Object payload() {
        return payload;
    }
}

这个测试不需要 RocketMQ 服务端。它先保证“我们准备发什么”是对的。

等 RocketMQ 容器启动以后,再验证“消息能不能真的发出去、被消费到”。

启动应用

RocketMQ 容器和 topic 准备好以后,启动第二章应用:

powershell
cd D:\idea_space\rocketmq-order-tutorial
mvn.cmd -pl chapter-02-rocketmq-first-message spring-boot:run

如果 RocketMQ Proxy 没有启动,应用会失败,而且失败得很明确。

我这里在 Proxy 不存在时真实看到的日志是:

text
Init Producer Args: Producer{endpoints='localhost:8081', topic='order-created-topic', requestTimeout=3, sslEnabled=false}
a producer init on proxy localhost:8081
Register the listener to container, listenerBeanName:orderCreatedListener
java.lang.IllegalStateException: Failed to start RocketMQ push consumer
Connection refused: getsockopt: localhost/[0:0:0:0:0:0:0:1]:8081

这段日志很有价值。

它说明 starter 已经生效了,producer 配置也被读取了,listener 也注册了。失败点不是 Spring Boot 没扫到 Bean,而是消费者启动时连不上 localhost:8081 的 RocketMQ Proxy。

所以看到这个错误时,不要先改 Java。先检查:

powershell
docker ps --filter "name=rocketmq-proxy"

如果 proxy 容器没起来,先把 RocketMQ 环境修好。

发送一条订单创建消息

当 RocketMQ 容器都启动,并且应用启动成功后,执行:

powershell
Invoke-RestMethod `
  -Uri 'http://127.0.0.1:18082/api/order-events' `
  -Method Post `
  -ContentType 'application/json; charset=utf-8' `
  -Body '{"orderId":"O10001","userId":"U10001"}'

你应该看到类似响应:

text
status eventId                              orderId messageId
------ -------                              ------- ---------
SENT   evt_8d5f0f5c-4b1c-4c58-b2c3-...     O10001  01F...

与此同时,应用控制台应该看到消费者日志:

text
consume order event messageId=01F...

看到这两个结果,就说明本章最小闭环成立:

  1. HTTP 请求进入 Spring Boot。
  2. Controller 调用 OrderEventPublisher
  3. OrderEventPublisher 生成 OrderCreatedEvent
  4. RocketMQClientTemplate 把消息发到 order-created-topic:OrderCreated
  5. PushConsumer 收到消息并返回消费成功。

如果你只看到 HTTP 响应,没有看到消费日志,先检查消费者组、topic、tag 和 Proxy。不要一上来怀疑消息体。

这一章真正学到什么

这一章不是为了显摆“我会发一条 MQ 消息”。

它真正训练的是工程接入中间件的顺序:

先准备服务端环境,再确认客户端配置;先发普通消息,再讲高级特性;先用最小事件跑通链路,再把它接回真实业务。

你现在已经有了三样东西:

  1. RocketMQ 5.x 本地单机环境的启动方式。
  2. Spring Boot starter 2.3.4 的最小 producer、consumer 配置。
  3. 一个能被测试、能被 HTTP 调用、能继续演进的伴生工程模块。

下一章,我们就回到第一章那个慢吞吞的下单接口。

产品又会来一句很熟悉的话:“下单成功以后,短信、积分、仓库都要继续做,但你别再让用户等了。”

这时候我们就把第一章的同步旁路动作,正式改成普通消息异步解耦。

练习题

先自己想。下一章开头我们会复盘这一组题,再进入普通消息改造。

  1. 为什么本章先把消息设计成 OrderCreatedEvent,而不是直接把订单表所有字段都发出去?
  2. rocketmq.producer.topicOrderEventProperties.destination() 里的 topic 是什么关系?它们不一致时可能会发生什么?
  3. 如果应用启动时报 Connection refused: localhost:8081,你会按什么顺序排查?
  4. 为什么本地环境要写 ssl-enabled: false
  5. 如果 HTTP 接口返回 SENT,但是消费者没有日志,你准备先看 topic/tag,还是先改消息体?为什么?

Built with VitePress. Deployed on Cloudflare Pages.