切换主题
05. PushConsumer:服务启动后,消息自己送上门
上一章我们把 topic、tag、key、eventId、consumerGroup 讲清楚了。
这一步非常重要。因为从这一章开始,我们不再只看“生产者把消息发出去没有”,而是要真的写消费者服务了。
消费者一多,问题马上就来了。
通知服务要发短信。
积分服务要加积分。
仓库服务要锁库存。
它们都关心订单创建事件,但它们不应该互相抢同一份消息。它们也不应该每个服务都写一个死循环去拉消息。更要命的是,如果通知服务发短信失败了,你能不能直接告诉 RocketMQ:我消费成功了?
不能。
短信没有发出去,你却返回成功,那这条消息后面就没有重试机会了。线上排查时你会非常难受:订单有了,消息也消费过了,可通知就是没发出去。
这一章就解决这个问题:用 PushConsumer 让服务启动后自动接收消息,并且把 SUCCESS 和 FAILURE 的边界讲清楚。
上一章练习参考答案
先把第 4 章的题复盘一下,再进入 PushConsumer。
第一题:OrderPaid 要不要新建 topic?
先不要一看到新事件就新建 topic。你要先问几个问题:它是不是仍然属于订单事件域?权限、保留时间、吞吐量、消费者范围是不是和 OrderCreated 差不多?如果答案基本一致,可以先继续放在 order-event-topic,用 tag=OrderPaid 区分。
如果 OrderPaid 的吞吐量明显更高,保留时间不同,权限隔离不同,或者消费者完全是另一批系统,那就考虑拆成独立 topic。topic 是资源边界,不是枚举值。
第二题:通知服务部署 3 个实例时,用几个 consumerGroup?
用 1 个。
比如三个实例都叫:
text
dev-notification-service-order-events这三个实例一起分摊通知服务自己的消费任务。你不要给三个实例配三个不同 group。那样每个实例都会收到一份消息,本来只该发一条短信,结果可能发三条。
第三题:通知服务和积分服务能不能共用同一个 consumerGroup?
不能。
通知服务和积分服务都要处理 OrderCreated。如果它们共用一个 group,RocketMQ 会把它们当成同一个消费集群里的不同实例。结果就是一条订单创建消息可能被通知服务拿走,也可能被积分服务拿走,但不会稳定地两边都处理。
这就变成了“有时候发短信,有时候加积分”,老板看完会很安静,安静得让人害怕。
第四题:线上只给你订单号 O10001,消息里至少要有哪些字段方便排查?
至少要有:
text
topic: order-event-topic
tag: OrderCreated
key: O10001
eventId: evt_xxx
orderId: O10001
userId: U10001
messageId: RocketMQ 生成的技术消息编号key 帮你按业务编号检索消息,eventId 帮你做幂等和追踪一次业务事件,messageId 帮你定位 RocketMQ 内部的那条消息。它们不是一个东西,不要混用。
第五题:测试环境和生产环境共用 consumerGroup 会怎样?
会出现环境串消费。
测试服务可能消费生产消息,生产服务也可能被测试消息干扰。轻一点是排查困难,重一点就是生产订单没有触达真正的生产消费者。
所以 consumerGroup 里必须带环境前缀:
text
dev-notification-service-order-events
prod-notification-service-order-events业务继续长:消费者不能靠人肉拉消息
现在我们回到订单系统。
第三章里,下单接口已经不再同步调用短信、积分、仓库了。它只负责创建订单,然后发布 OrderCreated。
第四章里,我们把消息模型和命名整理清楚了。
接下来产品来了一个很朴素的要求:
“订单创建后,通知、积分、仓库这些服务都要自动处理,不要还要点一下、拉一下、扫一下。”
这个要求看起来平平无奇,但它背后有几个真实问题。
第一个问题:消费者服务启动后,怎么知道自己该收哪些消息?
你总不能在服务里写一个 while 循环,每隔一秒去问 RocketMQ:“有我的消息吗?”这种写法又累又容易错。线程怎么停?拉多少?异常怎么处理?消费成功以后什么时候确认?这些都要你自己操心。
第二个问题:通知服务挂了怎么办?
比如短信服务超时了,通知消费者这次没处理完。这个时候它能不能返回 SUCCESS?
不能。
返回 SUCCESS 等于告诉 RocketMQ:这条消息我处理完了,后面不用再管。
可是短信根本没发出去。你把失败伪装成成功,等于亲手把重试机会掐掉。
第三个问题:通知服务有 3 个实例,消息会不会被重复消费 3 次?
如果 3 个实例属于同一个 consumerGroup,它们应该分摊消息,不是每个实例都拿一份。这个语义上一章已经讲过,这一章我们用代码跑出来。
这就是 PushConsumer 出场的地方。
PushConsumer 的思路很简单:服务启动时把订阅关系告诉 RocketMQ。消息到达后,客户端把消息推给你的监听器。你的监听器处理业务,然后返回 ConsumeResult.SUCCESS 或 ConsumeResult.FAILURE。
一句话记住:
text
业务真的处理完 -> SUCCESS
业务没有处理完 -> FAILURE本章伴生工程
本章新增模块:
text
chapter-05-push-consumer-auto/
pom.xml
src/main/resources/application.yml
src/main/java/com/example/rocketmqdemo/
TutorialPushConsumerApplication.java
api/
PushConsumerDemoController.java
PublishOrderCreatedPushRequest.java
PushConsumerDemoResponse.java
DeliveryView.java
config/
PushConsumerDemoConfiguration.java
model/
OrderCreatedEvent.java
MessageEnvelope.java
MessageNamingPolicy.java
consumer/
PushConsumerHandler.java
LocalPushConsumerEngine.java
LocalPushConsumerRegistration.java
DeliveryRecord.java
NotificationPushConsumer.java
PointsPushConsumer.java
WarehousePushConsumer.java
RecordingPushConsumer.java
rocketmq/
OrderEventCodec.java
RocketMqNotificationOrderCreatedListener.java
RocketMqPointsOrderCreatedListener.java
RocketMqWarehouseOrderCreatedListener.java
src/test/java/com/example/rocketmqdemo/
consumer/
LocalPushConsumerEngineTest.java
api/
PushConsumerDemoControllerTest.java这章有两套运行方式。
第一套是本地演示模式,不依赖 RocketMQ。它用 LocalPushConsumerEngine 模拟 PushConsumer 的投递效果,让你马上看见“不同 group 都能收到,同一个 group 的多个实例只分摊一份”。
第二套是真实 RocketMQ 模式。rocketmq 包下的三个监听器使用 @RocketMQMessageListener,真正接入 rocketmq-v5-client-spring-boot-starter:2.3.4。
教程先用本地演示把业务语义跑明白,再给真实 RocketMQ 写法。这样你不会被环境卡住,也不会只会背注解。
依赖还是这一套
pom.xml 里继续使用本教程指定版本:
xml
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-v5-client-spring-boot-starter</artifactId>
<version>2.3.4</version>
</dependency>完整依赖结构是:
xml
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-v5-client-spring-boot-starter</artifactId>
<version>2.3.4</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>配置文件:
yaml
server:
port: 18085
spring:
application:
name: chapter-05-push-consumer-auto
main:
banner-mode: "off"
tutorial:
env: dev
rocketmq:
endpoints: 127.0.0.1:8081
order-event:
topic: order-event-topic
notification:
consumer-group: dev-notification-service-order-events
points:
consumer-group: dev-points-service-order-events
warehouse:
consumer-group: dev-warehouse-service-order-events这里先把三个业务服务的 group 都配出来。不要让通知、积分、仓库共用一个 group。
先把消息命名固定下来
消费者收到消息后,不应该还在猜这是什么业务。
我们继续使用统一的命名策略:
java
public class MessageNamingPolicy {
private static final String ORDER_CREATED_TAG = "OrderCreated";
private final String env;
private final String orderEventTopic;
public MessageNamingPolicy(String env, String orderEventTopic) {
this.env = Objects.requireNonNull(env, "env must not be null");
this.orderEventTopic = Objects.requireNonNull(orderEventTopic, "orderEventTopic must not be null");
}
public MessageEnvelope envelope(OrderCreatedEvent event) {
return new MessageEnvelope(
orderEventTopic,
ORDER_CREATED_TAG,
event.orderId(),
event.eventId(),
event);
}
public String consumerGroup(String serviceName) {
return env + "-" + serviceName + "-order-events";
}
}订单创建事件本身长这样:
java
public record OrderCreatedEvent(
String eventId,
String orderId,
String userId,
int payAmount,
boolean failNotification
) {
public OrderCreatedEvent(String eventId, String orderId, String userId, int payAmount) {
this(eventId, orderId, userId, payAmount, false);
}
}failNotification 是本章演示用的开关。它模拟短信服务失败,方便你看到消费者返回 FAILURE 后,结果是什么样。
本地 PushConsumer 引擎
本地引擎不是 RocketMQ,它只是教学用的模拟器。
它模拟两件事。
第一,不同 consumerGroup 都会收到一份消息。
第二,同一个 consumerGroup 里如果有多个实例,只选一个实例处理这条消息。
核心代码:
java
public class LocalPushConsumerEngine {
private final Map<String, List<LocalPushConsumerRegistration>> registrationsByGroup = new LinkedHashMap<>();
private final Map<String, Integer> roundRobinIndexes = new LinkedHashMap<>();
public LocalPushConsumerEngine(List<LocalPushConsumerRegistration> registrations) {
for (LocalPushConsumerRegistration registration : registrations) {
registrationsByGroup
.computeIfAbsent(registration.consumerGroup(), ignored -> new ArrayList<>())
.add(registration);
}
}
public List<DeliveryRecord> push(MessageEnvelope envelope) {
List<DeliveryRecord> deliveries = new ArrayList<>();
for (Map.Entry<String, List<LocalPushConsumerRegistration>> entry : registrationsByGroup.entrySet()) {
List<LocalPushConsumerRegistration> matched = entry.getValue().stream()
.filter(registration -> registration.matches(envelope.tag()))
.toList();
if (matched.isEmpty()) {
continue;
}
LocalPushConsumerRegistration selected = select(entry.getKey(), matched);
ConsumeResult result = selected.handler().consume(envelope);
deliveries.add(new DeliveryRecord(
selected.consumerGroup(),
selected.instanceName(),
envelope.tag(),
envelope.key(),
envelope.eventId(),
envelope.event().orderId(),
result,
result == ConsumeResult.SUCCESS ? "business finished, commit consume progress" : "business failed, do not commit consume progress"));
}
return deliveries;
}
private LocalPushConsumerRegistration select(String consumerGroup, List<LocalPushConsumerRegistration> matched) {
int current = roundRobinIndexes.getOrDefault(consumerGroup, 0);
LocalPushConsumerRegistration selected = matched.get(current % matched.size());
roundRobinIndexes.put(consumerGroup, current + 1);
return selected;
}
}这里最值得注意的是 selected.handler().consume(envelope)。
真实 RocketMQ 里,消息也是进入你的监听器方法。你的业务逻辑处理完后,返回消费结果。
本地引擎把这个动作显式暴露出来,是为了让你先理解消费语义。
三个业务消费者
通知消费者:
java
@Component
public class NotificationPushConsumer implements PushConsumerHandler {
private static final Logger log = LoggerFactory.getLogger(NotificationPushConsumer.class);
@Override
public ConsumeResult consume(MessageEnvelope envelope) {
if (envelope.event().failNotification()) {
log.warn("notification failed orderId={}, eventId={}", envelope.event().orderId(), envelope.eventId());
return ConsumeResult.FAILURE;
}
log.info("notification sent orderId={}, eventId={}", envelope.event().orderId(), envelope.eventId());
return ConsumeResult.SUCCESS;
}
}积分消费者:
java
@Component
public class PointsPushConsumer implements PushConsumerHandler {
private static final Logger log = LoggerFactory.getLogger(PointsPushConsumer.class);
@Override
public ConsumeResult consume(MessageEnvelope envelope) {
log.info("points granted orderId={}, eventId={}", envelope.event().orderId(), envelope.eventId());
return ConsumeResult.SUCCESS;
}
}仓库消费者:
java
@Component
public class WarehousePushConsumer implements PushConsumerHandler {
private static final Logger log = LoggerFactory.getLogger(WarehousePushConsumer.class);
@Override
public ConsumeResult consume(MessageEnvelope envelope) {
log.info("warehouse reserved orderId={}, eventId={}", envelope.event().orderId(), envelope.eventId());
return ConsumeResult.SUCCESS;
}
}你会发现这三个类都很薄。
这是有意的。
监听器和消费者入口不要塞太多复杂逻辑。它们应该负责“解析消息、做幂等入口判断、调用业务服务、返回消费结果”。真正复杂的业务逻辑应该进 Service。
本章为了聚焦 PushConsumer,没有把短信、积分、仓库拆成更深的业务服务。后面讲重试、死信、幂等补偿时,我们会继续把它拆细。
把三个消费者注册到本地引擎
配置类:
java
@Configuration
public class PushConsumerDemoConfiguration {
@Bean
MessageNamingPolicy messageNamingPolicy(
@Value("${tutorial.env}") String env,
@Value("${tutorial.order-event.topic}") String topic) {
return new MessageNamingPolicy(env, topic);
}
@Bean
LocalPushConsumerEngine localPushConsumerEngine(
MessageNamingPolicy namingPolicy,
NotificationPushConsumer notificationConsumer,
PointsPushConsumer pointsConsumer,
WarehousePushConsumer warehouseConsumer) {
return new LocalPushConsumerEngine(List.of(
new LocalPushConsumerRegistration(namingPolicy.consumerGroup("notification-service"), "notification-1", "OrderCreated", notificationConsumer),
new LocalPushConsumerRegistration(namingPolicy.consumerGroup("notification-service"), "notification-2", "OrderCreated", notificationConsumer),
new LocalPushConsumerRegistration(namingPolicy.consumerGroup("points-service"), "points-1", "OrderCreated", pointsConsumer),
new LocalPushConsumerRegistration(namingPolicy.consumerGroup("warehouse-service"), "warehouse-1", "OrderCreated", warehouseConsumer)
));
}
}注意通知服务这里有两个实例:
text
notification-1
notification-2但它们属于同一个 group:
text
dev-notification-service-order-events所以一条订单创建消息不会同时交给两个通知实例。它们会分摊通知服务的任务。
积分和仓库是不同业务服务,所以各自使用不同 group。
提供一个 HTTP 入口看效果
Controller:
java
@RestController
@RequestMapping("/api/push-consumer")
public class PushConsumerDemoController {
private final MessageNamingPolicy namingPolicy;
private final LocalPushConsumerEngine localPushConsumerEngine;
public PushConsumerDemoController(MessageNamingPolicy namingPolicy, LocalPushConsumerEngine localPushConsumerEngine) {
this.namingPolicy = namingPolicy;
this.localPushConsumerEngine = localPushConsumerEngine;
}
@PostMapping("/order-created")
public PushConsumerDemoResponse publishOrderCreated(@RequestBody PublishOrderCreatedPushRequest request) {
OrderCreatedEvent event = request.toEvent();
MessageEnvelope envelope = namingPolicy.envelope(event);
return new PushConsumerDemoResponse(
envelope.topic(),
envelope.tag(),
envelope.key(),
envelope.eventId(),
localPushConsumerEngine.push(envelope).stream()
.map(DeliveryView::from)
.toList());
}
}这个接口不是说真实生产者能直接拿到所有消费者的执行结果。
真实 RocketMQ 里,生产者把消息发给 RocketMQ 后,消费者是异步消费的,生产者通常不会直接知道每个消费者成功还是失败。
这里把投递结果返回给 HTTP,是为了教学时一眼看懂 PushConsumer 的消费语义。
运行测试
执行:
powershell
cd D:\idea_space\rocketmq-order-tutorial
mvn.cmd -pl chapter-05-push-consumer-auto test真实结果:
text
Tests run: 3, Failures: 0, Errors: 0, Skipped: 0
BUILD SUCCESS这 3 个测试证明了三件事。
第一,不同 consumerGroup 都能收到同一条 OrderCreated。
第二,同一个 consumerGroup 里多个实例只会有一个实例处理这条消息。
第三,通知服务失败时返回 FAILURE,积分和仓库仍然可以成功处理自己的业务。
启动本地演示
启动:
powershell
cd D:\idea_space\rocketmq-order-tutorial
mvn.cmd -pl chapter-05-push-consumer-auto spring-boot:run服务启动成功后,你会看到端口:
text
Tomcat started on port(s): 18085 (http)先发一个正常请求:
powershell
Invoke-RestMethod `
-Uri 'http://127.0.0.1:18085/api/push-consumer/order-created' `
-Method Post `
-ContentType 'application/json; charset=utf-8' `
-Body '{"orderId":"O10004","userId":"U10004","payAmount":19900,"failNotification":false}'返回结果:
json
{
"topic": "order-event-topic",
"tag": "OrderCreated",
"key": "O10004",
"eventId": "evt_dbb9cd5a-bffc-4c8b-8c6c-4a8b7c679ca9",
"deliveries": [
{
"consumerGroup": "dev-notification-service-order-events",
"instanceName": "notification-1",
"result": "SUCCESS",
"orderId": "O10004",
"note": "business finished, commit consume progress"
},
{
"consumerGroup": "dev-points-service-order-events",
"instanceName": "points-1",
"result": "SUCCESS",
"orderId": "O10004",
"note": "business finished, commit consume progress"
},
{
"consumerGroup": "dev-warehouse-service-order-events",
"instanceName": "warehouse-1",
"result": "SUCCESS",
"orderId": "O10004",
"note": "business finished, commit consume progress"
}
]
}这个结果说明:通知、积分、仓库三个不同 group 都收到了订单创建事件,并且都处理成功。
再发一个通知失败的请求:
powershell
Invoke-RestMethod `
-Uri 'http://127.0.0.1:18085/api/push-consumer/order-created' `
-Method Post `
-ContentType 'application/json; charset=utf-8' `
-Body '{"orderId":"O10005","userId":"U10005","payAmount":29900,"failNotification":true}'返回结果:
json
{
"topic": "order-event-topic",
"tag": "OrderCreated",
"key": "O10005",
"eventId": "evt_8c224fc0-e11e-4502-a57a-973357918db5",
"deliveries": [
{
"consumerGroup": "dev-notification-service-order-events",
"instanceName": "notification-1",
"result": "FAILURE",
"orderId": "O10005",
"note": "business failed, do not commit consume progress"
},
{
"consumerGroup": "dev-points-service-order-events",
"instanceName": "points-1",
"result": "SUCCESS",
"orderId": "O10005",
"note": "business finished, commit consume progress"
},
{
"consumerGroup": "dev-warehouse-service-order-events",
"instanceName": "warehouse-1",
"result": "SUCCESS",
"orderId": "O10005",
"note": "business finished, commit consume progress"
}
]
}这次最关键的是通知服务:
text
consumerGroup: dev-notification-service-order-events
result: FAILURE
note: business failed, do not commit consume progress它没有假装成功。
积分和仓库是另外两个 group,它们处理成功,不受通知服务失败影响。
控制台也能看到类似日志:
text
notification sent orderId=O10004
points granted orderId=O10004
warehouse reserved orderId=O10004
notification failed orderId=O10005
points granted orderId=O10005
warehouse reserved orderId=O10005这就是你要建立的直觉:消息队列不是把失败藏起来,而是让每个消费者独立承担自己的成功和失败。
真实 RocketMQ 监听器怎么写
本地演示看懂以后,再看真实写法。
通知服务监听器:
java
@Component
@Profile("rocketmq")
@RocketMQMessageListener(
endpoints = "${tutorial.rocketmq.endpoints}",
topic = "${tutorial.order-event.topic}",
tag = "OrderCreated",
consumerGroup = "${tutorial.notification.consumer-group}",
consumptionThreadCount = 4
)
public class RocketMqNotificationOrderCreatedListener implements RocketMQListener {
private static final Logger log = LoggerFactory.getLogger(RocketMqNotificationOrderCreatedListener.class);
private final OrderEventCodec codec;
private final MessageNamingPolicy namingPolicy;
private final NotificationPushConsumer consumer;
public RocketMqNotificationOrderCreatedListener(
OrderEventCodec codec,
MessageNamingPolicy namingPolicy,
NotificationPushConsumer consumer) {
this.codec = codec;
this.namingPolicy = namingPolicy;
this.consumer = consumer;
}
@Override
public ConsumeResult consume(MessageView messageView) {
try {
OrderCreatedEvent event = codec.decode(messageView.getBody());
return consumer.consume(namingPolicy.envelope(event));
} catch (RuntimeException exception) {
log.warn("notification consume failed messageId={}, reason={}", messageView.getMessageId(), exception.getMessage());
return ConsumeResult.FAILURE;
}
}
}这里有几个点要慢慢看。
endpoints 是 RocketMQ 5.x Proxy 地址。本教程单机 Docker 环境里是 127.0.0.1:8081。
topic 是订阅的主题。
tag 是消息过滤条件。这里通知服务只关心 OrderCreated。
consumerGroup 是通知服务自己的消费组。积分服务、仓库服务要写自己的 group。
consumptionThreadCount 是消费线程数。不要一上来就调很大。线程数变大,只说明你本地能并发处理更多消息,不代表数据库、短信服务、仓库服务扛得住。
最重要的是返回值:
java
return consumer.consume(namingPolicy.envelope(event));通知业务成功,它返回 SUCCESS。
通知业务失败,它返回 FAILURE。
异常兜底也返回 FAILURE。
不要这样写:
java
try {
notificationService.send(event);
} catch (Exception exception) {
log.warn("send notification failed", exception);
}
return ConsumeResult.SUCCESS;这段代码看起来很稳,其实很危险。
它把失败吞掉了,然后告诉 RocketMQ 消费成功。以后你看到“订单创建了,但是通知没发”,就只能靠日志考古。
三个服务要写三个 listener
积分服务也要自己的监听器:
java
@RocketMQMessageListener(
endpoints = "${tutorial.rocketmq.endpoints}",
topic = "${tutorial.order-event.topic}",
tag = "OrderCreated",
consumerGroup = "${tutorial.points.consumer-group}",
consumptionThreadCount = 4
)仓库服务也要自己的监听器:
java
@RocketMQMessageListener(
endpoints = "${tutorial.rocketmq.endpoints}",
topic = "${tutorial.order-event.topic}",
tag = "OrderCreated",
consumerGroup = "${tutorial.warehouse.consumer-group}",
consumptionThreadCount = 4
)你会发现它们的 topic 和 tag 可以相同,但 consumerGroup 必须不同。
原因上一章已经讲过,这一章代码也跑出来了:不同业务服务都要收到这份订单事件,不能互相抢。
接入真实 RocketMQ
如果你已经按第 2 章启动了单机 RocketMQ,并创建了 order-event-topic,可以用真实 profile 启动第 5 章:
powershell
cd D:\idea_space\rocketmq-order-tutorial
mvn.cmd -pl chapter-05-push-consumer-auto spring-boot:run `
-Dspring-boot.run.profiles=rocketmq真实 RocketMQ 模式下,三个 @RocketMQMessageListener 会生效。
如果你没有启动 RocketMQ,就先不要开 rocketmq profile。直接跑默认模式看本地效果即可。
这个安排不是偷懒。它是工程教程里很重要的分层:先用本地可控场景把消费语义学明白,再接真实中间件。
PushConsumer 适合什么场景
PushConsumer 适合大多数后台服务自动消费消息的场景。
比如:
text
订单创建后发通知
订单支付后加积分
订单创建后锁库存
交易完成后生成账单
审核完成后触发后续流程这些场景有一个共同点:服务启动后一直等消息,消息来了就处理,处理完返回结果。
你不需要自己写拉取循环,也不需要每条消息都手动控制什么时候 ack。
但是 PushConsumer 不是万能的。
如果你希望业务自己决定“什么时候拉、一次拉多少、处理到哪一批再提交”,那就更像 SimpleConsumer 的场景。下一章我们会讲它。
先别急着比较谁高级。
PushConsumer 和 SimpleConsumer 不是谁替代谁,而是控制权不同。
PushConsumer 更像“消息到了叫我,我处理完告诉你结果”。
SimpleConsumer 更像“我自己来拿消息,拿多少、什么时候确认,我自己控制”。
消费代码的小技巧
第一个技巧:业务失败不要返回 SUCCESS。
这是底线。
只有当你确定这条消息对应的业务已经处理完成,或者你已经通过幂等记录确认它之前处理完成过,才返回 SUCCESS。
第二个技巧:先幂等,再调外部接口。
消费者最常见的结构应该像这样:
text
收到消息
-> 解析 eventId / orderId
-> 查询消费记录
-> 已处理则直接 SUCCESS
-> 未处理则执行业务
-> 写入消费记录
-> SUCCESS不要先调用短信接口,再去写幂等记录。否则重试时可能重复发短信。
第三个技巧:监听器不要写成上帝类。
监听器里做四件事就够了:
text
解码消息
记录关键日志
调用业务服务
返回消费结果复杂业务放 Service。监听器越薄,后面排查越清楚。
第四个技巧:消费线程数要跟下游能力一起调。
consumptionThreadCount 调大以后,消息消费可能更快,但数据库连接、Redis、短信接口、库存接口都会承压。
所以调线程数之前,先看:
text
单条消息平均耗时
下游接口限流
数据库连接池
CPU 和内存
消费堆积量
失败率第 13 章讲消费堆积和调优时,我们会把这些指标串起来。
这一章真正解决了什么
这一章不是只教了一个注解。
它解决的是:消费者服务上线以后,消息怎么自动进入业务代码,业务成功和失败怎么反馈给消息系统。
你现在应该能回答:
- 为什么通知、积分、仓库要用不同 consumerGroup。
- 为什么通知服务多个实例要用同一个 consumerGroup。
- 为什么业务失败不能返回
ConsumeResult.SUCCESS。 - 为什么监听器里要捕获异常并返回
FAILURE。 - 为什么消费线程数不能拍脑袋调大。
下一章我们讲 SimpleConsumer。那时候产品会继续提需求:有些任务不想“消息一来就处理”,而是要自己控制拉取节奏、批量处理和确认时机。
这就不是 PushConsumer 最舒服的地盘了。
练习题
先自己想。下一章开头我们会复盘这组题,再进入 SimpleConsumer。
- 通知服务发送短信失败时,监听器为什么不能直接返回
SUCCESS? - 通知服务部署 5 个实例时,应该用几个 consumerGroup?为什么?
- 通知服务和积分服务都订阅
OrderCreated,它们的topic、tag、consumerGroup分别应该是什么关系? - 你会把幂等检查放在调用短信接口之前还是之后?为什么?
- 如果消费线程数从 4 调到 64,可能先打爆哪些下游资源?