05 rocketMq如何保证消息顺序消费?

vvEcho 2024-01-20 14:08:37
Categories: Tags:

这个问题其实可以分为三点

第一点生产者要保证消息的顺序生产

第二点broker要保证消息的顺序保存

第三点消费者要保证消息的顺序消费

一、生产者端:确保同一业务消息进入同一队列

定义业务路由键(Message Group)

选择具有业务唯一性的标识(如订单ID、用户ID),作为消息路由的依据,确保同一组消息被路由到同一队列。

自定义队列选择策略,通过MessageQueueSelector接口实现哈希路由,将相同路由键的消息分配至固定队列:

1
2
3
4
5
SendResult sendResult = producer.send(msg, (mqs, msg, arg) -> {
String orderId = (String) arg;
int index = orderId.hashCode() % mqs.size(); // 哈希取模
return mqs.get(index);
}, orderId); // 传入路由键

同步发送消息,必须使用同步发送(send()方法),异步发送无法保证顺序性

二、Broker端:队列顺序存储

单队列FIFO存储:同一队列内的消息按发送顺序存储,严格先进先出

三、消费者端:单线程顺序处理

注册顺序监听器,使用MessageListenerOrderly接口,RocketMQ自动为每个队列分配单线程处理:

1
2
3
4
5
6
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
for (MessageExt msg : msgs) {
// 业务处理(单线程执行)
}
return ConsumeOrderlyStatus.SUCCESS;
});

自动队列锁机制
锁申请:消费者启动时向Broker申请队列锁,同一队列仅允许一个消费者处理
锁续期:消费者通过定时任务(默认20秒一次)向Broker续期锁,防止锁过期导致并发消费