11 rocketMq的延迟消息

vvEcho 2025-03-05 17:56:55
Categories: Tags:

RocketMQ默认提供18个固定延迟级别,对应不同时间间隔(如1s、5s、10s、1h、2h等)。

每个级别对应一个内部队列,消息根据延迟级别暂存至对应的队列中,到期后由Broker重新投递到目标Topic

延迟消息的实现原理:
生产者生产消息时,设置延迟级别;broker根据延迟级别,将消息暂存至对应的内部队列中(broker会为每个队列单独起一个线程,默认每秒扫描一遍看是否到期),待对应的消息到期后,broker重新投递到目标Topic;消费者收到消息后,消费即可。

rocketMq支持延迟消息吗?

必须支持啊,rocketMq默认支持18中延迟级别,可以在投递消息时加上延迟时间

1
2
3
4
5
Message msg = new Message("orderTopic", "tagA", "order123".getBytes());

msg.setDelayTimeLevel(3); //最大延时2小时
//1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
producer.send(msg);

延迟消息的原理
延迟消息的原理和half消息类似,当发送延时消息时,brocker会写入一个特殊的topic–SCHEDULE_TOPIC_XXXX,broker会有个定时任务,会定时检查SCHEDULE_TOPIC_XXXX这个topic,如果发现这个topic有消息,会检查这个消息的延迟时间,如果延迟时间到了,会重新写入真实的topic;

如何支持自定义的延时时间?
业务端自己实现延时

rocketMq5.x版本支持自定义延时,底层是时间轮算法

1
msg.setDeliverTimeMs(System.currentTimeMillis() + 17000); //延迟17s

时间轮算法

时间轮是一种用于管理大量定时任务的数据结构,本质是一个环形数组,每个slot代表一个时间片。任务根据延迟时间被放入对应的 slot,当指针随时间推进到该 slot 时执行任务。时间轮插入和删除任务的复杂度是 O(1),非常适合海量定时任务。很多中间件如 Netty 的 HashedWheelTimer、Kafka 的 Hierarchical Timing Wheel、RocketMQ 5.x 的 TimerWheel 都使用了时间轮算法