切换主题
06. SimpleConsumer:运营要可控批处理怎么办
产品又提需求
运营说:
有些订单履约失败,我想在后台手动拉一批消息出来,检查后再确认处理。
PushConsumer 是自动回调,适合服务持续消费。但运营批处理需要更强控制:一次拉多少、处理多久、什么时候确认成功。
这时用 SimpleConsumer。
SimpleConsumer 和 PushConsumer 的区别
| 维度 | PushConsumer | SimpleConsumer |
|---|---|---|
| 消费方式 | 消息到达后回调监听器 | 应用主动调用 receive |
| 确认方式 | 返回消费结果 | 手动 ack |
| 适合场景 | 常驻服务自动处理 | 批处理、后台任务、可控拉取 |
| 使用难度 | 更简单 | 更灵活,也更容易写错 |
配置
properties
rocketmq.simple-consumer.endpoints=localhost:8081
rocketmq.simple-consumer.consumer-group=ops-replay-group
rocketmq.simple-consumer.topic=order-event-topic
rocketmq.simple-consumer.tag=*
rocketmq.simple-consumer.filter-expression-type=tagRocketMQAutoConfiguration 会在存在 rocketmq.simple-consumer.endpoints 时创建 SimpleConsumerBuilder,再注入到 RocketMQClientTemplate。
主动拉取
java
@Service
public class OpsOrderMessageReplayService {
private final RocketMQClientTemplate rocketMQClientTemplate;
public OpsOrderMessageReplayService(RocketMQClientTemplate rocketMQClientTemplate) {
this.rocketMQClientTemplate = rocketMQClientTemplate;
}
public void replayOnce() throws ClientException {
List<MessageView> messages = rocketMQClientTemplate.receive(
10,
Duration.ofSeconds(60)
);
for (MessageView message : messages) {
handle(message);
rocketMQClientTemplate.ack(message);
}
}
}receive(10, Duration.ofSeconds(60)) 表示最多拉 10 条,并设置不可见时间为 60 秒。
不可见时间是什么
SimpleConsumer 拉到消息后,这条消息不会立刻从服务端删除。它会进入一段“对其他消费者不可见”的时间窗口。
在窗口内:
- 你处理成功并
ack,消息才算完成。 - 你没
ack,窗口过后消息可能再次被投递。
这就是 SimpleConsumer 必须讲清楚的核心。
小技巧
不可见时间要比单条消息最大处理时间略长。比如一次处理订单补偿最多 20 秒,可以先设 60 秒。不要为了保险设得特别长,否则失败消息重新投递会很慢。
常见坑
处理成功忘记 ack。
消息会再次投递,造成重复处理。处理失败也 ack。
失败消息不会再被消息系统重投,只能靠你自己的补偿。不可见时间太短。
业务还没处理完,消息就被重新投递,导致并发重复处理。
练习题
- 为什么运营后台批处理更适合 SimpleConsumer?
- 如果处理一条消息需要 2 分钟,不可见时间设 30 秒会发生什么?
- SimpleConsumer 中幂等还重要吗?
参考答案
- 因为运营批处理需要主动控制拉取数量、处理节奏和确认时机。
- 处理未完成时消息可能重新可见,被其他消费者重复拉取。
- 仍然重要。只要消息可能重复投递,就必须做幂等。
来源
- RocketMQ 消费类型:https://rocketmq.apache.org/docs/featureBehavior/06consumertype/
- 官方 SimpleConsumer sample:https://github.com/apache/rocketmq-spring/blob/rocketmq-spring-all-2.3.4/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consumer-simple-demo/src/main/java/org/apache/rocketmq/samples/springboot/V5SimpleConsumerConsumerApplication.java
RocketMQClientTemplate#receive/ack:https://github.com/apache/rocketmq-spring/blob/rocketmq-spring-all-2.3.4/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/core/RocketMQClientTemplate.java