RocketMQ默认提供18个固定延迟级别,对应不同时间间隔(如1s、5s、10s、1h、2h等)。
每个级别对应一个内部队列,消息根据延迟级别暂存至对应的队列中,到期后由Broker重新投递到目标Topic
延迟消息的实现原理:
生产者生产消息时,设置延迟级别;broker根据延迟级别,将消息暂存至对应的内部队列中(broker会为每个队列单独起一个线程,默认每秒扫描一遍看是否到期),待对应的消息到期后,broker重新投递到目标Topic;消费者收到消息后,消费即可。
rocketMq支持延迟消息吗?
必须支持啊,rocketMq默认支持18中延迟级别,可以在投递消息时加上延迟时间
1 | Message msg = new Message("orderTopic", "tagA", "order123".getBytes()); |
延迟消息的原理
延迟消息的原理和half消息类似,当发送延时消息时,brocker会写入一个特殊的topic–SCHEDULE_TOPIC_XXXX,broker会有个定时任务,会定时检查SCHEDULE_TOPIC_XXXX这个topic,如果发现这个topic有消息,会检查这个消息的延迟时间,如果延迟时间到了,会重新写入真实的topic;
如何支持自定义的延时时间?
业务端自己实现延时
rocketMq5.x版本支持自定义延时,底层是时间轮算法
1 | msg.setDeliverTimeMs(System.currentTimeMillis() + 17000); //延迟17s |
时间轮算法
时间轮是一种用于管理大量定时任务的数据结构,本质是一个环形数组,每个slot代表一个时间片。任务根据延迟时间被放入对应的 slot,当指针随时间推进到该 slot 时执行任务。时间轮插入和删除任务的复杂度是 O(1),非常适合海量定时任务。很多中间件如 Netty 的 HashedWheelTimer、Kafka 的 Hierarchical Timing Wheel、RocketMQ 5.x 的 TimerWheel 都使用了时间轮算法