Skip to content

05. PushConsumer:服务启动后,消息自己送上门

上一章我们把 topictagkeyeventIdconsumerGroup 讲清楚了。

这一步非常重要。因为从这一章开始,我们不再只看“生产者把消息发出去没有”,而是要真的写消费者服务了。

消费者一多,问题马上就来了。

通知服务要发短信。

积分服务要加积分。

仓库服务要锁库存。

它们都关心订单创建事件,但它们不应该互相抢同一份消息。它们也不应该每个服务都写一个死循环去拉消息。更要命的是,如果通知服务发短信失败了,你能不能直接告诉 RocketMQ:我消费成功了?

不能。

短信没有发出去,你却返回成功,那这条消息后面就没有重试机会了。线上排查时你会非常难受:订单有了,消息也消费过了,可通知就是没发出去。

这一章就解决这个问题:用 PushConsumer 让服务启动后自动接收消息,并且把 SUCCESSFAILURE 的边界讲清楚。

上一章练习参考答案

先把第 4 章的题复盘一下,再进入 PushConsumer。

第一题:OrderPaid 要不要新建 topic?

先不要一看到新事件就新建 topic。你要先问几个问题:它是不是仍然属于订单事件域?权限、保留时间、吞吐量、消费者范围是不是和 OrderCreated 差不多?如果答案基本一致,可以先继续放在 order-event-topic,用 tag=OrderPaid 区分。

如果 OrderPaid 的吞吐量明显更高,保留时间不同,权限隔离不同,或者消费者完全是另一批系统,那就考虑拆成独立 topic。topic 是资源边界,不是枚举值。

第二题:通知服务部署 3 个实例时,用几个 consumerGroup?

用 1 个。

比如三个实例都叫:

text
dev-notification-service-order-events

这三个实例一起分摊通知服务自己的消费任务。你不要给三个实例配三个不同 group。那样每个实例都会收到一份消息,本来只该发一条短信,结果可能发三条。

第三题:通知服务和积分服务能不能共用同一个 consumerGroup?

不能。

通知服务和积分服务都要处理 OrderCreated。如果它们共用一个 group,RocketMQ 会把它们当成同一个消费集群里的不同实例。结果就是一条订单创建消息可能被通知服务拿走,也可能被积分服务拿走,但不会稳定地两边都处理。

这就变成了“有时候发短信,有时候加积分”,老板看完会很安静,安静得让人害怕。

第四题:线上只给你订单号 O10001,消息里至少要有哪些字段方便排查?

至少要有:

text
topic: order-event-topic
tag: OrderCreated
key: O10001
eventId: evt_xxx
orderId: O10001
userId: U10001
messageId: RocketMQ 生成的技术消息编号

key 帮你按业务编号检索消息,eventId 帮你做幂等和追踪一次业务事件,messageId 帮你定位 RocketMQ 内部的那条消息。它们不是一个东西,不要混用。

第五题:测试环境和生产环境共用 consumerGroup 会怎样?

会出现环境串消费。

测试服务可能消费生产消息,生产服务也可能被测试消息干扰。轻一点是排查困难,重一点就是生产订单没有触达真正的生产消费者。

所以 consumerGroup 里必须带环境前缀:

text
dev-notification-service-order-events
prod-notification-service-order-events

业务继续长:消费者不能靠人肉拉消息

现在我们回到订单系统。

第三章里,下单接口已经不再同步调用短信、积分、仓库了。它只负责创建订单,然后发布 OrderCreated

第四章里,我们把消息模型和命名整理清楚了。

接下来产品来了一个很朴素的要求:

“订单创建后,通知、积分、仓库这些服务都要自动处理,不要还要点一下、拉一下、扫一下。”

这个要求看起来平平无奇,但它背后有几个真实问题。

第一个问题:消费者服务启动后,怎么知道自己该收哪些消息?

你总不能在服务里写一个 while 循环,每隔一秒去问 RocketMQ:“有我的消息吗?”这种写法又累又容易错。线程怎么停?拉多少?异常怎么处理?消费成功以后什么时候确认?这些都要你自己操心。

第二个问题:通知服务挂了怎么办?

比如短信服务超时了,通知消费者这次没处理完。这个时候它能不能返回 SUCCESS

不能。

返回 SUCCESS 等于告诉 RocketMQ:这条消息我处理完了,后面不用再管。

可是短信根本没发出去。你把失败伪装成成功,等于亲手把重试机会掐掉。

第三个问题:通知服务有 3 个实例,消息会不会被重复消费 3 次?

如果 3 个实例属于同一个 consumerGroup,它们应该分摊消息,不是每个实例都拿一份。这个语义上一章已经讲过,这一章我们用代码跑出来。

这就是 PushConsumer 出场的地方。

PushConsumer 的思路很简单:服务启动时把订阅关系告诉 RocketMQ。消息到达后,客户端把消息推给你的监听器。你的监听器处理业务,然后返回 ConsumeResult.SUCCESSConsumeResult.FAILURE

一句话记住:

text
业务真的处理完 -> SUCCESS
业务没有处理完 -> FAILURE

本章伴生工程

本章新增模块:

text
chapter-05-push-consumer-auto/
  pom.xml
  src/main/resources/application.yml
  src/main/java/com/example/rocketmqdemo/
    TutorialPushConsumerApplication.java
    api/
      PushConsumerDemoController.java
      PublishOrderCreatedPushRequest.java
      PushConsumerDemoResponse.java
      DeliveryView.java
    config/
      PushConsumerDemoConfiguration.java
    model/
      OrderCreatedEvent.java
      MessageEnvelope.java
      MessageNamingPolicy.java
    consumer/
      PushConsumerHandler.java
      LocalPushConsumerEngine.java
      LocalPushConsumerRegistration.java
      DeliveryRecord.java
      NotificationPushConsumer.java
      PointsPushConsumer.java
      WarehousePushConsumer.java
      RecordingPushConsumer.java
    rocketmq/
      OrderEventCodec.java
      RocketMqNotificationOrderCreatedListener.java
      RocketMqPointsOrderCreatedListener.java
      RocketMqWarehouseOrderCreatedListener.java
  src/test/java/com/example/rocketmqdemo/
    consumer/
      LocalPushConsumerEngineTest.java
    api/
      PushConsumerDemoControllerTest.java

这章有两套运行方式。

第一套是本地演示模式,不依赖 RocketMQ。它用 LocalPushConsumerEngine 模拟 PushConsumer 的投递效果,让你马上看见“不同 group 都能收到,同一个 group 的多个实例只分摊一份”。

第二套是真实 RocketMQ 模式。rocketmq 包下的三个监听器使用 @RocketMQMessageListener,真正接入 rocketmq-v5-client-spring-boot-starter:2.3.4

教程先用本地演示把业务语义跑明白,再给真实 RocketMQ 写法。这样你不会被环境卡住,也不会只会背注解。

依赖还是这一套

pom.xml 里继续使用本教程指定版本:

xml
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-v5-client-spring-boot-starter</artifactId>
    <version>2.3.4</version>
</dependency>

完整依赖结构是:

xml
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-v5-client-spring-boot-starter</artifactId>
        <version>2.3.4</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

配置文件:

yaml
server:
  port: 18085

spring:
  application:
    name: chapter-05-push-consumer-auto
  main:
    banner-mode: "off"

tutorial:
  env: dev
  rocketmq:
    endpoints: 127.0.0.1:8081
  order-event:
    topic: order-event-topic
  notification:
    consumer-group: dev-notification-service-order-events
  points:
    consumer-group: dev-points-service-order-events
  warehouse:
    consumer-group: dev-warehouse-service-order-events

这里先把三个业务服务的 group 都配出来。不要让通知、积分、仓库共用一个 group。

先把消息命名固定下来

消费者收到消息后,不应该还在猜这是什么业务。

我们继续使用统一的命名策略:

java
public class MessageNamingPolicy {

    private static final String ORDER_CREATED_TAG = "OrderCreated";

    private final String env;
    private final String orderEventTopic;

    public MessageNamingPolicy(String env, String orderEventTopic) {
        this.env = Objects.requireNonNull(env, "env must not be null");
        this.orderEventTopic = Objects.requireNonNull(orderEventTopic, "orderEventTopic must not be null");
    }

    public MessageEnvelope envelope(OrderCreatedEvent event) {
        return new MessageEnvelope(
                orderEventTopic,
                ORDER_CREATED_TAG,
                event.orderId(),
                event.eventId(),
                event);
    }

    public String consumerGroup(String serviceName) {
        return env + "-" + serviceName + "-order-events";
    }
}

订单创建事件本身长这样:

java
public record OrderCreatedEvent(
        String eventId,
        String orderId,
        String userId,
        int payAmount,
        boolean failNotification
) {

    public OrderCreatedEvent(String eventId, String orderId, String userId, int payAmount) {
        this(eventId, orderId, userId, payAmount, false);
    }
}

failNotification 是本章演示用的开关。它模拟短信服务失败,方便你看到消费者返回 FAILURE 后,结果是什么样。

本地 PushConsumer 引擎

本地引擎不是 RocketMQ,它只是教学用的模拟器。

它模拟两件事。

第一,不同 consumerGroup 都会收到一份消息。

第二,同一个 consumerGroup 里如果有多个实例,只选一个实例处理这条消息。

核心代码:

java
public class LocalPushConsumerEngine {

    private final Map<String, List<LocalPushConsumerRegistration>> registrationsByGroup = new LinkedHashMap<>();
    private final Map<String, Integer> roundRobinIndexes = new LinkedHashMap<>();

    public LocalPushConsumerEngine(List<LocalPushConsumerRegistration> registrations) {
        for (LocalPushConsumerRegistration registration : registrations) {
            registrationsByGroup
                    .computeIfAbsent(registration.consumerGroup(), ignored -> new ArrayList<>())
                    .add(registration);
        }
    }

    public List<DeliveryRecord> push(MessageEnvelope envelope) {
        List<DeliveryRecord> deliveries = new ArrayList<>();
        for (Map.Entry<String, List<LocalPushConsumerRegistration>> entry : registrationsByGroup.entrySet()) {
            List<LocalPushConsumerRegistration> matched = entry.getValue().stream()
                    .filter(registration -> registration.matches(envelope.tag()))
                    .toList();
            if (matched.isEmpty()) {
                continue;
            }

            LocalPushConsumerRegistration selected = select(entry.getKey(), matched);
            ConsumeResult result = selected.handler().consume(envelope);
            deliveries.add(new DeliveryRecord(
                    selected.consumerGroup(),
                    selected.instanceName(),
                    envelope.tag(),
                    envelope.key(),
                    envelope.eventId(),
                    envelope.event().orderId(),
                    result,
                    result == ConsumeResult.SUCCESS ? "business finished, commit consume progress" : "business failed, do not commit consume progress"));
        }
        return deliveries;
    }

    private LocalPushConsumerRegistration select(String consumerGroup, List<LocalPushConsumerRegistration> matched) {
        int current = roundRobinIndexes.getOrDefault(consumerGroup, 0);
        LocalPushConsumerRegistration selected = matched.get(current % matched.size());
        roundRobinIndexes.put(consumerGroup, current + 1);
        return selected;
    }
}

这里最值得注意的是 selected.handler().consume(envelope)

真实 RocketMQ 里,消息也是进入你的监听器方法。你的业务逻辑处理完后,返回消费结果。

本地引擎把这个动作显式暴露出来,是为了让你先理解消费语义。

三个业务消费者

通知消费者:

java
@Component
public class NotificationPushConsumer implements PushConsumerHandler {

    private static final Logger log = LoggerFactory.getLogger(NotificationPushConsumer.class);

    @Override
    public ConsumeResult consume(MessageEnvelope envelope) {
        if (envelope.event().failNotification()) {
            log.warn("notification failed orderId={}, eventId={}", envelope.event().orderId(), envelope.eventId());
            return ConsumeResult.FAILURE;
        }
        log.info("notification sent orderId={}, eventId={}", envelope.event().orderId(), envelope.eventId());
        return ConsumeResult.SUCCESS;
    }
}

积分消费者:

java
@Component
public class PointsPushConsumer implements PushConsumerHandler {

    private static final Logger log = LoggerFactory.getLogger(PointsPushConsumer.class);

    @Override
    public ConsumeResult consume(MessageEnvelope envelope) {
        log.info("points granted orderId={}, eventId={}", envelope.event().orderId(), envelope.eventId());
        return ConsumeResult.SUCCESS;
    }
}

仓库消费者:

java
@Component
public class WarehousePushConsumer implements PushConsumerHandler {

    private static final Logger log = LoggerFactory.getLogger(WarehousePushConsumer.class);

    @Override
    public ConsumeResult consume(MessageEnvelope envelope) {
        log.info("warehouse reserved orderId={}, eventId={}", envelope.event().orderId(), envelope.eventId());
        return ConsumeResult.SUCCESS;
    }
}

你会发现这三个类都很薄。

这是有意的。

监听器和消费者入口不要塞太多复杂逻辑。它们应该负责“解析消息、做幂等入口判断、调用业务服务、返回消费结果”。真正复杂的业务逻辑应该进 Service。

本章为了聚焦 PushConsumer,没有把短信、积分、仓库拆成更深的业务服务。后面讲重试、死信、幂等补偿时,我们会继续把它拆细。

把三个消费者注册到本地引擎

配置类:

java
@Configuration
public class PushConsumerDemoConfiguration {

    @Bean
    MessageNamingPolicy messageNamingPolicy(
            @Value("${tutorial.env}") String env,
            @Value("${tutorial.order-event.topic}") String topic) {
        return new MessageNamingPolicy(env, topic);
    }

    @Bean
    LocalPushConsumerEngine localPushConsumerEngine(
            MessageNamingPolicy namingPolicy,
            NotificationPushConsumer notificationConsumer,
            PointsPushConsumer pointsConsumer,
            WarehousePushConsumer warehouseConsumer) {
        return new LocalPushConsumerEngine(List.of(
                new LocalPushConsumerRegistration(namingPolicy.consumerGroup("notification-service"), "notification-1", "OrderCreated", notificationConsumer),
                new LocalPushConsumerRegistration(namingPolicy.consumerGroup("notification-service"), "notification-2", "OrderCreated", notificationConsumer),
                new LocalPushConsumerRegistration(namingPolicy.consumerGroup("points-service"), "points-1", "OrderCreated", pointsConsumer),
                new LocalPushConsumerRegistration(namingPolicy.consumerGroup("warehouse-service"), "warehouse-1", "OrderCreated", warehouseConsumer)
        ));
    }
}

注意通知服务这里有两个实例:

text
notification-1
notification-2

但它们属于同一个 group:

text
dev-notification-service-order-events

所以一条订单创建消息不会同时交给两个通知实例。它们会分摊通知服务的任务。

积分和仓库是不同业务服务,所以各自使用不同 group。

提供一个 HTTP 入口看效果

Controller:

java
@RestController
@RequestMapping("/api/push-consumer")
public class PushConsumerDemoController {

    private final MessageNamingPolicy namingPolicy;
    private final LocalPushConsumerEngine localPushConsumerEngine;

    public PushConsumerDemoController(MessageNamingPolicy namingPolicy, LocalPushConsumerEngine localPushConsumerEngine) {
        this.namingPolicy = namingPolicy;
        this.localPushConsumerEngine = localPushConsumerEngine;
    }

    @PostMapping("/order-created")
    public PushConsumerDemoResponse publishOrderCreated(@RequestBody PublishOrderCreatedPushRequest request) {
        OrderCreatedEvent event = request.toEvent();
        MessageEnvelope envelope = namingPolicy.envelope(event);
        return new PushConsumerDemoResponse(
                envelope.topic(),
                envelope.tag(),
                envelope.key(),
                envelope.eventId(),
                localPushConsumerEngine.push(envelope).stream()
                        .map(DeliveryView::from)
                        .toList());
    }
}

这个接口不是说真实生产者能直接拿到所有消费者的执行结果。

真实 RocketMQ 里,生产者把消息发给 RocketMQ 后,消费者是异步消费的,生产者通常不会直接知道每个消费者成功还是失败。

这里把投递结果返回给 HTTP,是为了教学时一眼看懂 PushConsumer 的消费语义。

运行测试

执行:

powershell
cd D:\idea_space\rocketmq-order-tutorial
mvn.cmd -pl chapter-05-push-consumer-auto test

真实结果:

text
Tests run: 3, Failures: 0, Errors: 0, Skipped: 0
BUILD SUCCESS

这 3 个测试证明了三件事。

第一,不同 consumerGroup 都能收到同一条 OrderCreated

第二,同一个 consumerGroup 里多个实例只会有一个实例处理这条消息。

第三,通知服务失败时返回 FAILURE,积分和仓库仍然可以成功处理自己的业务。

启动本地演示

启动:

powershell
cd D:\idea_space\rocketmq-order-tutorial
mvn.cmd -pl chapter-05-push-consumer-auto spring-boot:run

服务启动成功后,你会看到端口:

text
Tomcat started on port(s): 18085 (http)

先发一个正常请求:

powershell
Invoke-RestMethod `
  -Uri 'http://127.0.0.1:18085/api/push-consumer/order-created' `
  -Method Post `
  -ContentType 'application/json; charset=utf-8' `
  -Body '{"orderId":"O10004","userId":"U10004","payAmount":19900,"failNotification":false}'

返回结果:

json
{
  "topic": "order-event-topic",
  "tag": "OrderCreated",
  "key": "O10004",
  "eventId": "evt_dbb9cd5a-bffc-4c8b-8c6c-4a8b7c679ca9",
  "deliveries": [
    {
      "consumerGroup": "dev-notification-service-order-events",
      "instanceName": "notification-1",
      "result": "SUCCESS",
      "orderId": "O10004",
      "note": "business finished, commit consume progress"
    },
    {
      "consumerGroup": "dev-points-service-order-events",
      "instanceName": "points-1",
      "result": "SUCCESS",
      "orderId": "O10004",
      "note": "business finished, commit consume progress"
    },
    {
      "consumerGroup": "dev-warehouse-service-order-events",
      "instanceName": "warehouse-1",
      "result": "SUCCESS",
      "orderId": "O10004",
      "note": "business finished, commit consume progress"
    }
  ]
}

这个结果说明:通知、积分、仓库三个不同 group 都收到了订单创建事件,并且都处理成功。

再发一个通知失败的请求:

powershell
Invoke-RestMethod `
  -Uri 'http://127.0.0.1:18085/api/push-consumer/order-created' `
  -Method Post `
  -ContentType 'application/json; charset=utf-8' `
  -Body '{"orderId":"O10005","userId":"U10005","payAmount":29900,"failNotification":true}'

返回结果:

json
{
  "topic": "order-event-topic",
  "tag": "OrderCreated",
  "key": "O10005",
  "eventId": "evt_8c224fc0-e11e-4502-a57a-973357918db5",
  "deliveries": [
    {
      "consumerGroup": "dev-notification-service-order-events",
      "instanceName": "notification-1",
      "result": "FAILURE",
      "orderId": "O10005",
      "note": "business failed, do not commit consume progress"
    },
    {
      "consumerGroup": "dev-points-service-order-events",
      "instanceName": "points-1",
      "result": "SUCCESS",
      "orderId": "O10005",
      "note": "business finished, commit consume progress"
    },
    {
      "consumerGroup": "dev-warehouse-service-order-events",
      "instanceName": "warehouse-1",
      "result": "SUCCESS",
      "orderId": "O10005",
      "note": "business finished, commit consume progress"
    }
  ]
}

这次最关键的是通知服务:

text
consumerGroup: dev-notification-service-order-events
result: FAILURE
note: business failed, do not commit consume progress

它没有假装成功。

积分和仓库是另外两个 group,它们处理成功,不受通知服务失败影响。

控制台也能看到类似日志:

text
notification sent orderId=O10004
points granted orderId=O10004
warehouse reserved orderId=O10004
notification failed orderId=O10005
points granted orderId=O10005
warehouse reserved orderId=O10005

这就是你要建立的直觉:消息队列不是把失败藏起来,而是让每个消费者独立承担自己的成功和失败。

真实 RocketMQ 监听器怎么写

本地演示看懂以后,再看真实写法。

通知服务监听器:

java
@Component
@Profile("rocketmq")
@RocketMQMessageListener(
        endpoints = "${tutorial.rocketmq.endpoints}",
        topic = "${tutorial.order-event.topic}",
        tag = "OrderCreated",
        consumerGroup = "${tutorial.notification.consumer-group}",
        consumptionThreadCount = 4
)
public class RocketMqNotificationOrderCreatedListener implements RocketMQListener {

    private static final Logger log = LoggerFactory.getLogger(RocketMqNotificationOrderCreatedListener.class);

    private final OrderEventCodec codec;
    private final MessageNamingPolicy namingPolicy;
    private final NotificationPushConsumer consumer;

    public RocketMqNotificationOrderCreatedListener(
            OrderEventCodec codec,
            MessageNamingPolicy namingPolicy,
            NotificationPushConsumer consumer) {
        this.codec = codec;
        this.namingPolicy = namingPolicy;
        this.consumer = consumer;
    }

    @Override
    public ConsumeResult consume(MessageView messageView) {
        try {
            OrderCreatedEvent event = codec.decode(messageView.getBody());
            return consumer.consume(namingPolicy.envelope(event));
        } catch (RuntimeException exception) {
            log.warn("notification consume failed messageId={}, reason={}", messageView.getMessageId(), exception.getMessage());
            return ConsumeResult.FAILURE;
        }
    }
}

这里有几个点要慢慢看。

endpoints 是 RocketMQ 5.x Proxy 地址。本教程单机 Docker 环境里是 127.0.0.1:8081

topic 是订阅的主题。

tag 是消息过滤条件。这里通知服务只关心 OrderCreated

consumerGroup 是通知服务自己的消费组。积分服务、仓库服务要写自己的 group。

consumptionThreadCount 是消费线程数。不要一上来就调很大。线程数变大,只说明你本地能并发处理更多消息,不代表数据库、短信服务、仓库服务扛得住。

最重要的是返回值:

java
return consumer.consume(namingPolicy.envelope(event));

通知业务成功,它返回 SUCCESS

通知业务失败,它返回 FAILURE

异常兜底也返回 FAILURE

不要这样写:

java
try {
    notificationService.send(event);
} catch (Exception exception) {
    log.warn("send notification failed", exception);
}
return ConsumeResult.SUCCESS;

这段代码看起来很稳,其实很危险。

它把失败吞掉了,然后告诉 RocketMQ 消费成功。以后你看到“订单创建了,但是通知没发”,就只能靠日志考古。

三个服务要写三个 listener

积分服务也要自己的监听器:

java
@RocketMQMessageListener(
        endpoints = "${tutorial.rocketmq.endpoints}",
        topic = "${tutorial.order-event.topic}",
        tag = "OrderCreated",
        consumerGroup = "${tutorial.points.consumer-group}",
        consumptionThreadCount = 4
)

仓库服务也要自己的监听器:

java
@RocketMQMessageListener(
        endpoints = "${tutorial.rocketmq.endpoints}",
        topic = "${tutorial.order-event.topic}",
        tag = "OrderCreated",
        consumerGroup = "${tutorial.warehouse.consumer-group}",
        consumptionThreadCount = 4
)

你会发现它们的 topictag 可以相同,但 consumerGroup 必须不同。

原因上一章已经讲过,这一章代码也跑出来了:不同业务服务都要收到这份订单事件,不能互相抢。

接入真实 RocketMQ

如果你已经按第 2 章启动了单机 RocketMQ,并创建了 order-event-topic,可以用真实 profile 启动第 5 章:

powershell
cd D:\idea_space\rocketmq-order-tutorial
mvn.cmd -pl chapter-05-push-consumer-auto spring-boot:run `
  -Dspring-boot.run.profiles=rocketmq

真实 RocketMQ 模式下,三个 @RocketMQMessageListener 会生效。

如果你没有启动 RocketMQ,就先不要开 rocketmq profile。直接跑默认模式看本地效果即可。

这个安排不是偷懒。它是工程教程里很重要的分层:先用本地可控场景把消费语义学明白,再接真实中间件。

PushConsumer 适合什么场景

PushConsumer 适合大多数后台服务自动消费消息的场景。

比如:

text
订单创建后发通知
订单支付后加积分
订单创建后锁库存
交易完成后生成账单
审核完成后触发后续流程

这些场景有一个共同点:服务启动后一直等消息,消息来了就处理,处理完返回结果。

你不需要自己写拉取循环,也不需要每条消息都手动控制什么时候 ack。

但是 PushConsumer 不是万能的。

如果你希望业务自己决定“什么时候拉、一次拉多少、处理到哪一批再提交”,那就更像 SimpleConsumer 的场景。下一章我们会讲它。

先别急着比较谁高级。

PushConsumer 和 SimpleConsumer 不是谁替代谁,而是控制权不同。

PushConsumer 更像“消息到了叫我,我处理完告诉你结果”。

SimpleConsumer 更像“我自己来拿消息,拿多少、什么时候确认,我自己控制”。

消费代码的小技巧

第一个技巧:业务失败不要返回 SUCCESS

这是底线。

只有当你确定这条消息对应的业务已经处理完成,或者你已经通过幂等记录确认它之前处理完成过,才返回 SUCCESS

第二个技巧:先幂等,再调外部接口。

消费者最常见的结构应该像这样:

text
收到消息
  -> 解析 eventId / orderId
  -> 查询消费记录
  -> 已处理则直接 SUCCESS
  -> 未处理则执行业务
  -> 写入消费记录
  -> SUCCESS

不要先调用短信接口,再去写幂等记录。否则重试时可能重复发短信。

第三个技巧:监听器不要写成上帝类。

监听器里做四件事就够了:

text
解码消息
记录关键日志
调用业务服务
返回消费结果

复杂业务放 Service。监听器越薄,后面排查越清楚。

第四个技巧:消费线程数要跟下游能力一起调。

consumptionThreadCount 调大以后,消息消费可能更快,但数据库连接、Redis、短信接口、库存接口都会承压。

所以调线程数之前,先看:

text
单条消息平均耗时
下游接口限流
数据库连接池
CPU 和内存
消费堆积量
失败率

第 13 章讲消费堆积和调优时,我们会把这些指标串起来。

这一章真正解决了什么

这一章不是只教了一个注解。

它解决的是:消费者服务上线以后,消息怎么自动进入业务代码,业务成功和失败怎么反馈给消息系统。

你现在应该能回答:

  1. 为什么通知、积分、仓库要用不同 consumerGroup。
  2. 为什么通知服务多个实例要用同一个 consumerGroup。
  3. 为什么业务失败不能返回 ConsumeResult.SUCCESS
  4. 为什么监听器里要捕获异常并返回 FAILURE
  5. 为什么消费线程数不能拍脑袋调大。

下一章我们讲 SimpleConsumer。那时候产品会继续提需求:有些任务不想“消息一来就处理”,而是要自己控制拉取节奏、批量处理和确认时机。

这就不是 PushConsumer 最舒服的地盘了。

练习题

先自己想。下一章开头我们会复盘这组题,再进入 SimpleConsumer。

  1. 通知服务发送短信失败时,监听器为什么不能直接返回 SUCCESS
  2. 通知服务部署 5 个实例时,应该用几个 consumerGroup?为什么?
  3. 通知服务和积分服务都订阅 OrderCreated,它们的 topictagconsumerGroup 分别应该是什么关系?
  4. 你会把幂等检查放在调用短信接口之前还是之后?为什么?
  5. 如果消费线程数从 4 调到 64,可能先打爆哪些下游资源?

Built with VitePress. Deployed on Cloudflare Pages.