这个问题其实可以分为三点
第一点生产者要保证消息的顺序生产
第二点broker要保证消息的顺序保存
第三点消费者要保证消息的顺序消费
一、生产者端:确保同一业务消息进入同一队列
定义业务路由键(Message Group)
选择具有业务唯一性的标识(如订单ID、用户ID),作为消息路由的依据,确保同一组消息被路由到同一队列。
自定义队列选择策略,通过MessageQueueSelector接口实现哈希路由,将相同路由键的消息分配至固定队列:
1 | SendResult sendResult = producer.send(msg, (mqs, msg, arg) -> { |
同步发送消息,必须使用同步发送(send()方法),异步发送无法保证顺序性
二、Broker端:队列顺序存储
单队列FIFO存储:同一队列内的消息按发送顺序存储,严格先进先出
三、消费者端:单线程顺序处理
注册顺序监听器,使用MessageListenerOrderly接口,RocketMQ自动为每个队列分配单线程处理:
1 | consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> { |
自动队列锁机制
锁申请:消费者启动时向Broker申请队列锁,同一队列仅允许一个消费者处理
锁续期:消费者通过定时任务(默认20秒一次)向Broker续期锁,防止锁过期导致并发消费