Skip to content

01. 业务从同步开始:先跑出问题,再谈 MQ

本章先不讲 RocketMQ

老板,这一章我们先不上 MQ。

很多教程一上来就说“异步解耦、削峰填谷、最终一致”,这些词都对,但你还没亲眼看到旧方案怎么难受时,它们就是口号。

所以第一章我们先写一个非常真实的同步下单系统:

用户下单成功后,要发短信、加积分、通知仓库。

这个需求一点都不过分。刚上线的时候,很多人都会直接写在下单接口里。它能跑,也好理解。

但我们要把它真的跑起来,然后问几个现场问题:

  1. 短信服务慢了,下单接口是不是也要跟着慢?
  2. 短信服务挂了,用户就不能下单了吗?
  3. 用户看到失败后重试,会不会重复加积分、重复通知仓库?
  4. 以后产品又加发优惠券、App 推送、同步 CRM,难道还继续往下单方法里塞?

你先把这些问题跑出来,再看 RocketMQ,就会有感觉。

伴生工程位置

本教程从这一章开始配一个真实伴生工程:

text
D:\idea_space\rocketmq-order-tutorial

第一章模块:

text
D:\idea_space\rocketmq-order-tutorial\chapter-01-sync-order

本章所有代码都在这个工程里,不是只贴几个片段让你猜。

这一章你会跑出什么效果

你会跑出三组结果。

第一组,正常下单能成功,但是接口要等短信、积分、仓库都执行完。默认配置里短信 300ms,积分 200ms,仓库 500ms,所以服务端返回大约要 1s

第二组,同一个请求再调一次,会创建新订单,也会再执行一遍短信、积分、仓库。也就是说,当前同步版没有解决重试幂等。

第三组,把短信服务配置成失败。你会看到订单已经保存了,但接口返回 500。这就是同步方案最尴尬的地方:用户看到失败,系统里又可能已经有订单。

完整工程结构

第一章工程结构如下:

text
rocketmq-order-tutorial/
  pom.xml
  README.md
  chapter-01-sync-order/
    pom.xml
    README.md
    src/
      main/
        java/com/example/rocketmqdemo/
          TutorialOrderApplication.java
          api/
            CreateOrderRequest.java
            ErrorResponse.java
            OrderApiExceptionHandler.java
            OrderController.java
            OrderCreateHttpResponse.java
          config/
            OrderDemoConfiguration.java
            SideEffectProperties.java
          order/
            CreateOrderCommand.java
            InMemoryOrderRepository.java
            LoggingSideEffectAudit.java
            Order.java
            OrderApplicationService.java
            OrderCreateResult.java
            OrderRepository.java
            OrderSideEffects.java
            SideEffectAudit.java
            SideEffectBehavior.java
            SideEffectClient.java
            SideEffectException.java
            SimulatedSideEffectClient.java
            Sleeper.java
            ThreadSleeper.java
        resources/
          application.yml
      test/
        java/com/example/rocketmqdemo/
          api/
            OrderControllerSmsFailureIntegrationTest.java
            OrderControllerSuccessIntegrationTest.java
          order/
            OrderApplicationServiceTest.java
            RecordingSideEffectAudit.java
            RecordingSleeper.java

这就是我说的工程思维:不是一个 Service 片段,而是可以启动、可以测试、可以模拟故障、可以留下运行证据的完整工程。

先看依赖

第一章还没有引入 RocketMQ,只需要 Spring Boot Web 和测试依赖。

父工程 pom.xml 统一锁 Java 和 Spring Boot 版本:

xml
<properties>
    <java.version>17</java.version>
    <spring-boot.version>2.7.18</spring-boot.version>
    <maven.compiler.release>${java.version}</maven.compiler.release>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>

为什么这里选 Spring Boot 2.7.18

因为后面要接入的 rocketmq-v5-client-spring-boot-starter:2.3.4,它的官方 parent POM 使用的也是 Spring Boot 2.7.18。第一章先对齐这个底座,后面加 RocketMQ 时少踩版本兼容坑。

第一章模块依赖:

xml
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

再看可配置的“假旁路服务”

我们不真的接短信平台、积分系统、仓库系统。第一章先用模拟客户端把问题跑出来。

配置在 application.yml

yaml
server:
  port: 18081

tutorial:
  side-effects:
    sms:
      delay-ms: 300
      fail: false
    points:
      delay-ms: 200
      fail: false
    warehouse:
      delay-ms: 500
      fail: false

这三个配置很重要。

它们不是为了偷懒,而是为了教学时能稳定复现问题:短信慢一点、仓库慢一点、短信直接失败,都可以通过配置模拟。

核心模拟客户端:

java
public class SimulatedSideEffectClient implements SideEffectClient {

    private final SideEffectBehavior behavior;
    private final Sleeper sleeper;
    private final SideEffectAudit audit;

    public SimulatedSideEffectClient(
            SideEffectBehavior behavior,
            Sleeper sleeper,
            SideEffectAudit audit
    ) {
        this.behavior = behavior;
        this.sleeper = sleeper;
        this.audit = audit;
    }

    @Override
    public String name() {
        return behavior.name();
    }

    @Override
    public void execute(Order order) {
        log.info(
                "side-effect start name={}, orderId={}, delayMs={}, fail={}",
                behavior.name(),
                order.orderId(),
                behavior.delayMillis(),
                behavior.fail()
        );
        sleeper.sleep(behavior.delayMillis());
        if (behavior.fail()) {
            audit.failed(behavior.name());
            throw new SideEffectException("side effect failed: " + behavior.name());
        }
        audit.success(behavior.name(), order.orderId());
    }
}

这段代码的意思很直白:

  1. 先打印当前旁路动作。
  2. 按配置 sleep 一段时间。
  3. 如果配置了失败,就抛异常。
  4. 否则记录成功。

同步下单代码

现在看主角:下单服务。

java
@Service
public class OrderApplicationService {

    private final OrderRepository orderRepository;
    private final OrderSideEffects sideEffects;
    private final Clock clock;
    private final AtomicLong sequence = new AtomicLong(10000);

    public OrderCreateResult createOrder(CreateOrderCommand command) {
        String orderId = "O" + sequence.incrementAndGet();
        Order order = new Order(orderId, command.userId(), command.payAmount(), clock.instant());
        orderRepository.save(order);
        log.info("order saved orderId={}, userId={}, payAmount={}", orderId, command.userId(), command.payAmount());

        List<String> sideEffectNames = sideEffects.executeAll(order);
        log.info("order api finished orderId={}, sideEffects={}", orderId, sideEffectNames);
        return new OrderCreateResult(orderId, order.userId(), order.payAmount(), sideEffectNames);
    }
}

刚开始看,这段代码没问题。

保存订单,然后发短信、加积分、通知仓库,最后返回成功。业务顺序清清楚楚。

但你注意这一行:

java
List<String> sideEffectNames = sideEffects.executeAll(order);

它把所有旁路动作都塞进了下单主链路。

只要短信慢,下单就慢。只要仓库慢,下单就慢。只要短信抛异常,整个接口就抛异常。

HTTP 入口

为了让读者能真实调用,本章提供了接口:

java
@RestController
@RequestMapping("/api/orders")
public class OrderController {

    private final OrderApplicationService orderApplicationService;

    @PostMapping
    public OrderCreateHttpResponse createOrder(@RequestBody CreateOrderRequest request) {
        long startNanos = System.nanoTime();
        OrderCreateResult result = orderApplicationService.createOrder(
                new CreateOrderCommand(request.userId(), request.payAmount())
        );
        long costMs = Duration.ofNanos(System.nanoTime() - startNanos).toMillis();
        return new OrderCreateHttpResponse(
                "SUCCESS",
                result.orderId(),
                result.userId(),
                result.payAmount(),
                result.sideEffectNames(),
                costMs
        );
    }
}

异常处理:

java
@RestControllerAdvice
public class OrderApiExceptionHandler {

    @ExceptionHandler(SideEffectException.class)
    public ResponseEntity<ErrorResponse> handleSideEffectException(SideEffectException ex) {
        return ResponseEntity
                .status(HttpStatus.INTERNAL_SERVER_ERROR)
                .body(new ErrorResponse("FAILED", ex.getMessage(), Duration.ZERO.toMillis()));
    }
}

所以短信失败时,接口会直接返回 HTTP 500

这不是我们最终想要的效果。它只是第一章故意保留的旧方案问题。

先跑测试

在伴生工程根目录运行:

powershell
cd D:\idea_space\rocketmq-order-tutorial
mvn.cmd -pl chapter-01-sync-order test

本章测试覆盖三件事:

  1. 同步下单会在主线程执行短信、积分、仓库。
  2. 旁路服务失败,会让下单接口失败,即使订单已经保存。
  3. 同一个业务请求重试,会重复执行旁路动作。

真实测试结果:

text
Tests run: 5, Failures: 0, Errors: 0, Skipped: 0
BUILD SUCCESS

第一次真实调用:成功,但慢

启动服务:

powershell
cd D:\idea_space\rocketmq-order-tutorial
mvn.cmd -pl chapter-01-sync-order spring-boot:run

请求:

powershell
Invoke-WebRequest `
  -Uri 'http://127.0.0.1:18081/api/orders' `
  -Method Post `
  -ContentType 'application/json; charset=utf-8' `
  -Body '{"userId":"U10001","payAmount":19900}'

真实响应:

json
{
  "status": "SUCCESS",
  "orderId": "O10001",
  "userId": "U10001",
  "payAmount": 19900,
  "sideEffects": ["sms", "points", "warehouse"],
  "costMs": 1030
}

客户端观测耗时:1183ms

日志里可以看到它不是一下就返回,而是一个个等:

text
order saved orderId=O10001, userId=U10001, payAmount=19900
side-effect start name=sms, orderId=O10001, delayMs=300, fail=false
side-effect success name=sms, orderId=O10001
side-effect start name=points, orderId=O10001, delayMs=200, fail=false
side-effect success name=points, orderId=O10001
side-effect start name=warehouse, orderId=O10001, delayMs=500, fail=false
side-effect success name=warehouse, orderId=O10001
order api finished orderId=O10001, sideEffects=[sms, points, warehouse]

这时候你就要问自己:

短信、积分、仓库都成功了当然没事。

但是它们为什么要让用户在下单接口等着?

用户只是要知道“订单创建成功没有”。短信和积分晚几秒处理,很多业务是能接受的。可现在它们全部压在主链路里,接口自然越来越慢。

第二次真实调用:用户重试会怎样

我们用同样请求再调一次:

json
{
  "status": "SUCCESS",
  "orderId": "O10002",
  "userId": "U10001",
  "payAmount": 19900,
  "sideEffects": ["sms", "points", "warehouse"],
  "costMs": 1028
}

关键日志:

text
order saved orderId=O10002, userId=U10001, payAmount=19900
side-effect start name=sms, orderId=O10002, delayMs=300, fail=false
side-effect success name=sms, orderId=O10002
side-effect start name=points, orderId=O10002, delayMs=200, fail=false
side-effect success name=points, orderId=O10002
side-effect start name=warehouse, orderId=O10002, delayMs=500, fail=false
side-effect success name=warehouse, orderId=O10002
order api finished orderId=O10002, sideEffects=[sms, points, warehouse]

注意 orderId 变成了 O10002

这说明现在的同步版没有业务幂等。用户重试一次,就可能创建一次新订单,也会再加一次积分、再通知一次仓库。

你当然可以说:“那我加幂等表不就行了吗?”

对,幂等一定要做。后面也会做。

但即使你加了幂等,短信、积分、仓库仍然在主链路里。一个慢,接口就慢;一个挂,接口就挂。幂等解决不了链路耦合。

第三次真实调用:短信失败怎么办

现在模拟短信服务失败。

启动命令改一下:

powershell
cd D:\idea_space\rocketmq-order-tutorial
mvn.cmd -pl chapter-01-sync-order spring-boot:run `
  -Dspring-boot.run.arguments="--tutorial.side-effects.sms.delay-ms=0 --tutorial.side-effects.sms.fail=true --tutorial.side-effects.points.delay-ms=0 --tutorial.side-effects.warehouse.delay-ms=0"

请求还是同一个:

powershell
Invoke-WebRequest `
  -Uri 'http://127.0.0.1:18081/api/orders' `
  -Method Post `
  -ContentType 'application/json; charset=utf-8' `
  -Body '{"userId":"U10001","payAmount":19900}'

真实响应:

json
{
  "status": "FAILED",
  "message": "side effect failed: sms",
  "costMs": 0
}

HTTP 状态码是 500

关键日志:

text
order saved orderId=O10001, userId=U10001, payAmount=19900
side-effect start name=sms, orderId=O10001, delayMs=0, fail=true
side-effect failed name=sms

现在问题就很直观了:

订单已经保存了。

但是短信失败了。

接口返回失败了。

用户看到失败以后,很可能再点一次。然后系统里可能又来一次订单创建、积分发放、仓库通知。

你要是因为短信发不出去就让订单都下不了,老板大概率不会接受。短信不是下单主流程,它最多应该失败后重试或补偿,不应该反过来把订单创建打断。

到这里,我们真正想要什么

我们不是想把代码“换成 MQ”这么简单。

我们真正想要的是把业务动作分级:

订单创建是主链路,必须尽快给用户确定结果。

短信、积分、仓库通知是订单创建后的后续动作。它们重要,但大多数时候可以晚一点,可以重试,可以补偿,不应该拖垮下单接口。

所以更合理的方向是:

text
下单接口
  -> 保存订单
  -> 发出 OrderCreated 事件
  -> 尽快返回成功

短信服务 / 积分服务 / 仓库服务
  -> 各自消费 OrderCreated
  -> 各自处理自己的业务
  -> 失败时各自重试、告警、补偿

这就是 MQ 要进场的原因。

不是因为“项目里必须有 MQ 才高级”,而是因为业务增长以后,主链路和旁路动作绑在一起已经开始出问题了。

但先别急着上生产

MQ 不是万能胶。

引入 MQ 后,你会得到异步解耦,也会得到新的问题:

  1. 消息发出去了,但消费者失败怎么办?
  2. 消费者重复消费怎么办?
  3. 订单保存成功了,但消息发送失败怎么办?
  4. 消息堆积了怎么发现?
  5. 哪些 Topic、Tag、ConsumerGroup 应该怎么命名?
  6. 哪些场景用普通消息、顺序消息、延时消息、事务消息?

所以本教程后面不会只教你“怎么发一条消息”。

我们会沿着这个订单业务继续长:

先部署 RocketMQ 单机版,跑通第一条消息。

再把短信、积分、仓库拆成消费者。

然后业务继续增长,逐步引出普通消息、PushConsumer、SimpleConsumer、顺序消息、延时消息、事务消息、异步发送、过滤、重试、死信、幂等、消费堆积、ACL 和生产配置。

本章小技巧

第一,做中间件教程时,先别急着引中间件。先把没有中间件时的问题跑出来。读者看到痛点以后,后面的特性才有位置。

第二,模拟外部服务时不要只写 System.out.println("发送短信")。要能配置延迟和失败,否则你看不到真实链路问题。

第三,教程里的代码要能测试。第一章测试不是摆设,它明确证明了三个行为:同步旁路在主链路、旁路失败拖垮接口、重复请求会重复触发旁路。

第四,真实运行时注意停服务。spring-boot:run 在 Windows 下可能留下实际占用端口的 Java 子进程。如果端口被占用,可以查:

powershell
Get-NetTCPConnection -LocalPort 18081

然后确认是本章 demo 进程后再停止。别乱杀不认识的进程。

练习题

先自己想。下一章开头我们会复盘这一组题,再进入 RocketMQ 单机环境。

  1. warehouse.delay-ms500 改成 3000,下单接口会发生什么变化?
  2. 如果短信失败,订单到底应不应该回滚?请先写出你的业务假设。
  3. 如果同一个用户重复提交同一笔业务请求,你准备怎么识别“重复”?
  4. 下单后发优惠券,应该放在主链路还是旁路?为什么?
  5. 现在还没有 MQ,你能先用本地线程池异步处理吗?这样会有什么风险?

Built with VitePress. Deployed on Cloudflare Pages.