切换主题
01. 业务从同步开始:先跑出问题,再谈 MQ
本章先不讲 RocketMQ
老板,这一章我们先不上 MQ。
很多教程一上来就说“异步解耦、削峰填谷、最终一致”,这些词都对,但你还没亲眼看到旧方案怎么难受时,它们就是口号。
所以第一章我们先写一个非常真实的同步下单系统:
用户下单成功后,要发短信、加积分、通知仓库。
这个需求一点都不过分。刚上线的时候,很多人都会直接写在下单接口里。它能跑,也好理解。
但我们要把它真的跑起来,然后问几个现场问题:
- 短信服务慢了,下单接口是不是也要跟着慢?
- 短信服务挂了,用户就不能下单了吗?
- 用户看到失败后重试,会不会重复加积分、重复通知仓库?
- 以后产品又加发优惠券、App 推送、同步 CRM,难道还继续往下单方法里塞?
你先把这些问题跑出来,再看 RocketMQ,就会有感觉。
伴生工程位置
本教程从这一章开始配一个真实伴生工程:
text
D:\idea_space\rocketmq-order-tutorial第一章模块:
text
D:\idea_space\rocketmq-order-tutorial\chapter-01-sync-order本章所有代码都在这个工程里,不是只贴几个片段让你猜。
这一章你会跑出什么效果
你会跑出三组结果。
第一组,正常下单能成功,但是接口要等短信、积分、仓库都执行完。默认配置里短信 300ms,积分 200ms,仓库 500ms,所以服务端返回大约要 1s。
第二组,同一个请求再调一次,会创建新订单,也会再执行一遍短信、积分、仓库。也就是说,当前同步版没有解决重试幂等。
第三组,把短信服务配置成失败。你会看到订单已经保存了,但接口返回 500。这就是同步方案最尴尬的地方:用户看到失败,系统里又可能已经有订单。
完整工程结构
第一章工程结构如下:
text
rocketmq-order-tutorial/
pom.xml
README.md
chapter-01-sync-order/
pom.xml
README.md
src/
main/
java/com/example/rocketmqdemo/
TutorialOrderApplication.java
api/
CreateOrderRequest.java
ErrorResponse.java
OrderApiExceptionHandler.java
OrderController.java
OrderCreateHttpResponse.java
config/
OrderDemoConfiguration.java
SideEffectProperties.java
order/
CreateOrderCommand.java
InMemoryOrderRepository.java
LoggingSideEffectAudit.java
Order.java
OrderApplicationService.java
OrderCreateResult.java
OrderRepository.java
OrderSideEffects.java
SideEffectAudit.java
SideEffectBehavior.java
SideEffectClient.java
SideEffectException.java
SimulatedSideEffectClient.java
Sleeper.java
ThreadSleeper.java
resources/
application.yml
test/
java/com/example/rocketmqdemo/
api/
OrderControllerSmsFailureIntegrationTest.java
OrderControllerSuccessIntegrationTest.java
order/
OrderApplicationServiceTest.java
RecordingSideEffectAudit.java
RecordingSleeper.java这就是我说的工程思维:不是一个 Service 片段,而是可以启动、可以测试、可以模拟故障、可以留下运行证据的完整工程。
先看依赖
第一章还没有引入 RocketMQ,只需要 Spring Boot Web 和测试依赖。
父工程 pom.xml 统一锁 Java 和 Spring Boot 版本:
xml
<properties>
<java.version>17</java.version>
<spring-boot.version>2.7.18</spring-boot.version>
<maven.compiler.release>${java.version}</maven.compiler.release>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>为什么这里选 Spring Boot 2.7.18?
因为后面要接入的 rocketmq-v5-client-spring-boot-starter:2.3.4,它的官方 parent POM 使用的也是 Spring Boot 2.7.18。第一章先对齐这个底座,后面加 RocketMQ 时少踩版本兼容坑。
第一章模块依赖:
xml
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>再看可配置的“假旁路服务”
我们不真的接短信平台、积分系统、仓库系统。第一章先用模拟客户端把问题跑出来。
配置在 application.yml:
yaml
server:
port: 18081
tutorial:
side-effects:
sms:
delay-ms: 300
fail: false
points:
delay-ms: 200
fail: false
warehouse:
delay-ms: 500
fail: false这三个配置很重要。
它们不是为了偷懒,而是为了教学时能稳定复现问题:短信慢一点、仓库慢一点、短信直接失败,都可以通过配置模拟。
核心模拟客户端:
java
public class SimulatedSideEffectClient implements SideEffectClient {
private final SideEffectBehavior behavior;
private final Sleeper sleeper;
private final SideEffectAudit audit;
public SimulatedSideEffectClient(
SideEffectBehavior behavior,
Sleeper sleeper,
SideEffectAudit audit
) {
this.behavior = behavior;
this.sleeper = sleeper;
this.audit = audit;
}
@Override
public String name() {
return behavior.name();
}
@Override
public void execute(Order order) {
log.info(
"side-effect start name={}, orderId={}, delayMs={}, fail={}",
behavior.name(),
order.orderId(),
behavior.delayMillis(),
behavior.fail()
);
sleeper.sleep(behavior.delayMillis());
if (behavior.fail()) {
audit.failed(behavior.name());
throw new SideEffectException("side effect failed: " + behavior.name());
}
audit.success(behavior.name(), order.orderId());
}
}这段代码的意思很直白:
- 先打印当前旁路动作。
- 按配置 sleep 一段时间。
- 如果配置了失败,就抛异常。
- 否则记录成功。
同步下单代码
现在看主角:下单服务。
java
@Service
public class OrderApplicationService {
private final OrderRepository orderRepository;
private final OrderSideEffects sideEffects;
private final Clock clock;
private final AtomicLong sequence = new AtomicLong(10000);
public OrderCreateResult createOrder(CreateOrderCommand command) {
String orderId = "O" + sequence.incrementAndGet();
Order order = new Order(orderId, command.userId(), command.payAmount(), clock.instant());
orderRepository.save(order);
log.info("order saved orderId={}, userId={}, payAmount={}", orderId, command.userId(), command.payAmount());
List<String> sideEffectNames = sideEffects.executeAll(order);
log.info("order api finished orderId={}, sideEffects={}", orderId, sideEffectNames);
return new OrderCreateResult(orderId, order.userId(), order.payAmount(), sideEffectNames);
}
}刚开始看,这段代码没问题。
保存订单,然后发短信、加积分、通知仓库,最后返回成功。业务顺序清清楚楚。
但你注意这一行:
java
List<String> sideEffectNames = sideEffects.executeAll(order);它把所有旁路动作都塞进了下单主链路。
只要短信慢,下单就慢。只要仓库慢,下单就慢。只要短信抛异常,整个接口就抛异常。
HTTP 入口
为了让读者能真实调用,本章提供了接口:
java
@RestController
@RequestMapping("/api/orders")
public class OrderController {
private final OrderApplicationService orderApplicationService;
@PostMapping
public OrderCreateHttpResponse createOrder(@RequestBody CreateOrderRequest request) {
long startNanos = System.nanoTime();
OrderCreateResult result = orderApplicationService.createOrder(
new CreateOrderCommand(request.userId(), request.payAmount())
);
long costMs = Duration.ofNanos(System.nanoTime() - startNanos).toMillis();
return new OrderCreateHttpResponse(
"SUCCESS",
result.orderId(),
result.userId(),
result.payAmount(),
result.sideEffectNames(),
costMs
);
}
}异常处理:
java
@RestControllerAdvice
public class OrderApiExceptionHandler {
@ExceptionHandler(SideEffectException.class)
public ResponseEntity<ErrorResponse> handleSideEffectException(SideEffectException ex) {
return ResponseEntity
.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(new ErrorResponse("FAILED", ex.getMessage(), Duration.ZERO.toMillis()));
}
}所以短信失败时,接口会直接返回 HTTP 500。
这不是我们最终想要的效果。它只是第一章故意保留的旧方案问题。
先跑测试
在伴生工程根目录运行:
powershell
cd D:\idea_space\rocketmq-order-tutorial
mvn.cmd -pl chapter-01-sync-order test本章测试覆盖三件事:
- 同步下单会在主线程执行短信、积分、仓库。
- 旁路服务失败,会让下单接口失败,即使订单已经保存。
- 同一个业务请求重试,会重复执行旁路动作。
真实测试结果:
text
Tests run: 5, Failures: 0, Errors: 0, Skipped: 0
BUILD SUCCESS第一次真实调用:成功,但慢
启动服务:
powershell
cd D:\idea_space\rocketmq-order-tutorial
mvn.cmd -pl chapter-01-sync-order spring-boot:run请求:
powershell
Invoke-WebRequest `
-Uri 'http://127.0.0.1:18081/api/orders' `
-Method Post `
-ContentType 'application/json; charset=utf-8' `
-Body '{"userId":"U10001","payAmount":19900}'真实响应:
json
{
"status": "SUCCESS",
"orderId": "O10001",
"userId": "U10001",
"payAmount": 19900,
"sideEffects": ["sms", "points", "warehouse"],
"costMs": 1030
}客户端观测耗时:1183ms。
日志里可以看到它不是一下就返回,而是一个个等:
text
order saved orderId=O10001, userId=U10001, payAmount=19900
side-effect start name=sms, orderId=O10001, delayMs=300, fail=false
side-effect success name=sms, orderId=O10001
side-effect start name=points, orderId=O10001, delayMs=200, fail=false
side-effect success name=points, orderId=O10001
side-effect start name=warehouse, orderId=O10001, delayMs=500, fail=false
side-effect success name=warehouse, orderId=O10001
order api finished orderId=O10001, sideEffects=[sms, points, warehouse]这时候你就要问自己:
短信、积分、仓库都成功了当然没事。
但是它们为什么要让用户在下单接口等着?
用户只是要知道“订单创建成功没有”。短信和积分晚几秒处理,很多业务是能接受的。可现在它们全部压在主链路里,接口自然越来越慢。
第二次真实调用:用户重试会怎样
我们用同样请求再调一次:
json
{
"status": "SUCCESS",
"orderId": "O10002",
"userId": "U10001",
"payAmount": 19900,
"sideEffects": ["sms", "points", "warehouse"],
"costMs": 1028
}关键日志:
text
order saved orderId=O10002, userId=U10001, payAmount=19900
side-effect start name=sms, orderId=O10002, delayMs=300, fail=false
side-effect success name=sms, orderId=O10002
side-effect start name=points, orderId=O10002, delayMs=200, fail=false
side-effect success name=points, orderId=O10002
side-effect start name=warehouse, orderId=O10002, delayMs=500, fail=false
side-effect success name=warehouse, orderId=O10002
order api finished orderId=O10002, sideEffects=[sms, points, warehouse]注意 orderId 变成了 O10002。
这说明现在的同步版没有业务幂等。用户重试一次,就可能创建一次新订单,也会再加一次积分、再通知一次仓库。
你当然可以说:“那我加幂等表不就行了吗?”
对,幂等一定要做。后面也会做。
但即使你加了幂等,短信、积分、仓库仍然在主链路里。一个慢,接口就慢;一个挂,接口就挂。幂等解决不了链路耦合。
第三次真实调用:短信失败怎么办
现在模拟短信服务失败。
启动命令改一下:
powershell
cd D:\idea_space\rocketmq-order-tutorial
mvn.cmd -pl chapter-01-sync-order spring-boot:run `
-Dspring-boot.run.arguments="--tutorial.side-effects.sms.delay-ms=0 --tutorial.side-effects.sms.fail=true --tutorial.side-effects.points.delay-ms=0 --tutorial.side-effects.warehouse.delay-ms=0"请求还是同一个:
powershell
Invoke-WebRequest `
-Uri 'http://127.0.0.1:18081/api/orders' `
-Method Post `
-ContentType 'application/json; charset=utf-8' `
-Body '{"userId":"U10001","payAmount":19900}'真实响应:
json
{
"status": "FAILED",
"message": "side effect failed: sms",
"costMs": 0
}HTTP 状态码是 500。
关键日志:
text
order saved orderId=O10001, userId=U10001, payAmount=19900
side-effect start name=sms, orderId=O10001, delayMs=0, fail=true
side-effect failed name=sms现在问题就很直观了:
订单已经保存了。
但是短信失败了。
接口返回失败了。
用户看到失败以后,很可能再点一次。然后系统里可能又来一次订单创建、积分发放、仓库通知。
你要是因为短信发不出去就让订单都下不了,老板大概率不会接受。短信不是下单主流程,它最多应该失败后重试或补偿,不应该反过来把订单创建打断。
到这里,我们真正想要什么
我们不是想把代码“换成 MQ”这么简单。
我们真正想要的是把业务动作分级:
订单创建是主链路,必须尽快给用户确定结果。
短信、积分、仓库通知是订单创建后的后续动作。它们重要,但大多数时候可以晚一点,可以重试,可以补偿,不应该拖垮下单接口。
所以更合理的方向是:
text
下单接口
-> 保存订单
-> 发出 OrderCreated 事件
-> 尽快返回成功
短信服务 / 积分服务 / 仓库服务
-> 各自消费 OrderCreated
-> 各自处理自己的业务
-> 失败时各自重试、告警、补偿这就是 MQ 要进场的原因。
不是因为“项目里必须有 MQ 才高级”,而是因为业务增长以后,主链路和旁路动作绑在一起已经开始出问题了。
但先别急着上生产
MQ 不是万能胶。
引入 MQ 后,你会得到异步解耦,也会得到新的问题:
- 消息发出去了,但消费者失败怎么办?
- 消费者重复消费怎么办?
- 订单保存成功了,但消息发送失败怎么办?
- 消息堆积了怎么发现?
- 哪些 Topic、Tag、ConsumerGroup 应该怎么命名?
- 哪些场景用普通消息、顺序消息、延时消息、事务消息?
所以本教程后面不会只教你“怎么发一条消息”。
我们会沿着这个订单业务继续长:
先部署 RocketMQ 单机版,跑通第一条消息。
再把短信、积分、仓库拆成消费者。
然后业务继续增长,逐步引出普通消息、PushConsumer、SimpleConsumer、顺序消息、延时消息、事务消息、异步发送、过滤、重试、死信、幂等、消费堆积、ACL 和生产配置。
本章小技巧
第一,做中间件教程时,先别急着引中间件。先把没有中间件时的问题跑出来。读者看到痛点以后,后面的特性才有位置。
第二,模拟外部服务时不要只写 System.out.println("发送短信")。要能配置延迟和失败,否则你看不到真实链路问题。
第三,教程里的代码要能测试。第一章测试不是摆设,它明确证明了三个行为:同步旁路在主链路、旁路失败拖垮接口、重复请求会重复触发旁路。
第四,真实运行时注意停服务。spring-boot:run 在 Windows 下可能留下实际占用端口的 Java 子进程。如果端口被占用,可以查:
powershell
Get-NetTCPConnection -LocalPort 18081然后确认是本章 demo 进程后再停止。别乱杀不认识的进程。
练习题
先自己想。下一章开头我们会复盘这一组题,再进入 RocketMQ 单机环境。
- 把
warehouse.delay-ms从500改成3000,下单接口会发生什么变化? - 如果短信失败,订单到底应不应该回滚?请先写出你的业务假设。
- 如果同一个用户重复提交同一笔业务请求,你准备怎么识别“重复”?
- 下单后发优惠券,应该放在主链路还是旁路?为什么?
- 现在还没有 MQ,你能先用本地线程池异步处理吗?这样会有什么风险?