RabbitMQ实现延迟队列的方式

1.背景

    最近在做类似拍卖系统的上架功能,卖家上架物品以后,例如到期时间24小时或者48小时,如果无竞拍者或者购买者,则物品自动下架到用户的邮件中。诸如电商用户下单,30分钟未支付,则自动取消订单,归还库存.
实现这些类似的业务场景,大家想到最简单的方式和我想的差不多如下.看看是否和你想的基本一致.

1.1 MySQL+轮询

    把数据写入到MySQL的一张表,然后程序轮询,例如1分钟或者几分钟轮询一次,看上架单是否已经到期,到期则下架.

优点:

        简单粗暴,可以复用业务中的Redis中间件,不需要额外维护其他中间件。 数据量小的时候没啥问题 ,数据量大了以后,第一个查询慢,第二对数据库产生不必要的压力.

缺点:

        无脑轮询,浪费资源,并且即时性很差.

1.2 Redis+轮询实现

    基于Redis+轮询实现,这种方式显然比MySQL要显得聪明。因为操作Redis的速度要比操作MySQL的速度快很多.上架单到期的message写到zset,利用可以排序的特点,把剩余超时时间最短的排在最前面,程序轮询如果数据达到超时条件则执行业务逻辑.

优点:

       简单粗暴,可以复用业务中的Redis中间件,不需要额外维护其他中间件。速度相对MySQL快.

缺点:

     1.存储成本高,因为Redis的数据全部是在内存中,内存相对MySQL占用磁盘的成本是昂贵的.数据量太大会对Redis的内存要求高.   

     2.数据消费zset如果中断,则消息丢失,有些数据则不能完整处理。因为zset没有像MQ的ack机制.

1.3 Redis+订阅Key过期事件

    因为Redis可以key设置超时时间,过期Redis自动删除key.同时Redis支持订阅key的过期事件,拿到事件则再操作业务。

优缺点与上述Redis+轮询实现,基本差不多。这个事件通知机制也没有ack,如果通知到了,但是业务服务挂了,下次继续监听不会再收到消息。

那么这几个日常能想到的办法都想了,还有哪种比较靠谱的方式来实现这样的业务场景呢?即时性能得到保证的同时,我们也希望容错性得到保证,不希望因为异常情况导致数据丢失等情况的发送或者说尽量避免这种情况。

最后我采用的是基于RabbitMQ来实现这个业务场景. 因为MQ第一个有Ack机制,保证异常的时候消息能够被重新消费处理, 第二就是MQ削峰的能力强.

2.RabbitMQ队列设置TTL+死信队列

    在RabbitMQ中,我们可以给某个queue绑定一个exchange来作为这个queue的死信队列。触发消息投递到死信队列有几种情况:

 1.消息被reject或者nack且没有设置requeue重入队列 

 2.消息TTL或者队列设置了TTL,时间过期

 3.消息超过了队列的长度,被丢弃

我们可以看到第二项,给队列设置TTL或者给消息设置TTL.一旦过期,则投递到死信队列,我们只要消费死信队列即可完成延迟队列业务的需求.

3.RabbitMQ消息设置TTL+死信队列

    与上面设置队列TTL类似,我们可以单独给消息设置TTL.有些时候,我们不想对同一个队列设置相同的TTL,这个时候我们可以采取这种方式.

如果队列和消息都设置了TTL,那么以两者最小的TTL过期时间为准. 这两种情况,都遵循队列的【先进先出,后进后出】的原则.   

这种方式都会带来一个问题,那就是过期消息【队头阻塞】.举个栗子,下面的场景大家认为消费顺序是怎样的?

图片


想象的输出结果: 1s、 50s、100s

 实际的输出结果:     100s、50s、1s

这不不对劲啊!!!怎么和我想象中的延迟队列有出入,明明1s的比100s和50s的快到期,但是最后一个被消费的。那么1s的数据就会伴随延迟很久. 明显不符合我们的业务场景,谁先过期就消费谁,这个才是我们的初衷。因为这过期时间没必要有强制的先后顺序。 

 原理是:  消息队列不会对所有消息做定时器,或者起一个线程/进程查看,哪个消息过期了没.这样对MQ的资源消耗太猛了,撑不住。那怎么实现过期原理了?   数据出队的时候,在进行判断。过期了则丢弃到死新队列,否则就队头阻塞等着。即使你后面有1s的消息其实过期了,但是队头100s的消息还没过期,那后面1s的消息你就还得等着. 

 那么怎么实现我们理解那种延迟队列,谁先过期消费谁呢? RabbitMQ到底能不能实现这种功能。答案是肯定的。  RabbitMQ官方维护了一个插件: rabbitmq-delayed-message-exchange

Github地址: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

4.RabbitMQ的Delay插件

    RabbitMQ开启此插件可以定义一个Delay的Exchange交换机,用户发送的消息message,发送到这个交换机上,该插件会延迟时间将数据message投递到Queue中。只要有message超时则进行投递到Queue。用户最后从队列中将消息进行消费即可,不需要使用TTl+死信队列的方式进行延迟队列的实现了。

1.定义一个Exchange, type类型为=>x-delayed-message .且Header头添加一个标签=>x-delayed-type(direct)

2.投递消息的时候,在消息的header中传递TTL过期时间: x-delay => 60000 (ms,单位 毫秒)
图片

此时消费的顺序是: 1s、50s、100s
注意事项:

1.该插件虽然能够实现延迟队列的效果,但是官方说明了该插件的局限性。 就是队列的数据量不能太大, 例如堆积了几十W或者上百万的数据。这个插件存在局限性,用在生产环境需要谨慎判断自己消息的堆积数据量.

2.可以使用Promethues监控当前堆积的消息数量(消息还未超时,没投递到Queue的消息的数量)
指标名称:

erlang_mnesia_tablewise_size{table=”rabbit_delayed_messagerabbit@rabbitmq-server”}

图片