1.1.1. 03.延时队列


一般常用的消息队列有 RabbitMQ, KAFKA 这样强大的中间件,可以再应用之间进行异步地消息传递功能。它们功能强大并专业,特性诸多,但是使用起来也比较繁琐。

有了 redis,可以从中解脱出来,不过如果对消息的可靠性有较高的要求,那么 redis 就不适合了,还是得用 RabbitMQ, KAFKA 专业的中间件来。

异步消息队列

队列

使用 redis 的 List 数据结构来作为消息队列,通过 lpush,rpop 或者 rpush,lpop 进行插入和消费。

这里会存在一个问题,如果队列空了会怎么办。

  • 我们可以 sleep 一会,降低 CPU 占用率和 QPS。但是这也会导致消息队列产生延迟问题。
  • 所以,可以通过 blpop/brpop 替代前面的 lpop/rpop。通过阻塞读的方式来降低 CPU 和 QPS,并且延迟几乎为零。(b 代表 blocking 阻塞读:在队列没有数据的时候,会立即进入休眠状态,一旦数据来了便立即醒来。)

但是,上面的方案还是有问题,因为如果线程一旦阻塞在那里,Redis 的客户端连接就成了闲置连接,闲置过久,服务器便会主动断开,减少闲置资源占用。这个时候 blpop/brpop 就会抛出异常。

所以,消费的时候一定要注意,捕获异常并重试

锁冲突问题

如果客户端加锁没加成功该怎么办呢?

  1. 直接抛出异常,通知用户重试。 这种本质上是放弃当前请求,让用户决定是否重新尝试。
  2. sleep 一会再重试。 sleep 会阻塞当前线程,并且导致队列后续消息产生延迟。如果因为个别死锁 key 导致加锁不成功,线程便会彻底堵死,后续任务永远无法处理。
  3. 将请求转入延时队列,过会再重试。 这种方式比较适合异步消息处理,将当前冲突的请求移到另一个队列延后处理以避开冲突。

延迟队列实现

一般通过 Redis 的 zset(有序集合) 数据结构来实现。将消息序列化为一个字符串作为 zset 的 value,到期时间作为 score,然后采用多线程轮询的方式来进行处理。多线程为了保障可用性,万一挂了一个线程,其他线程也能正常处理。但是有多线程,还要解决并发争抢任务,防止一个任务被执行多次。

Copyright © Kagami丶 2019 all right reserved,powered by Gitbook该文件修订时间: 2019-12-10 20:25:25

results matching ""

    No results matching ""