redis 消息队列

list

List 底层的实现就是一个「链表」,在头部和尾部操作元素,时间复杂度都是 O(1),这意味着它非常符合消息队列的模型。

生产者使用 LPUSH 发布消息

1
2
3
4
5
127.0.0.1:6379> LPUSH queue msg1
(integer) 1
127.0.0.1:6379> LPUSH queue msg2
(integer) 2

RPOP

RPOP消费者使用循环会频繁拉取消息,这会造成CPU 空转,不仅浪费 CPU 资源,还会对Redis造成压力

1
2
3
4
5
6
7
8
while true:
    msg = redis.rpop("queue")
    // 没有消息休眠2s
    if msg == null:
        sleep(2)
        continue
    // 处理消息        
    handle(msg)

又带来另外一个问题:当消费者在休眠等待时,有新消息来了,那消费者处理新消息就会存在「延迟」。

BPOP

及时处理新消息,还能避免 CPU 空转:提供了阻塞式拉取消息的命令:BRPOP / BLPOP

1
2
3
4
5
6
7
while true:
    // 没消息阻塞等待0表示不设置超时时间 直到有新消息才返回否则会在指定的超时时间后返回 NULL
    msg = redis.brpop("queue", 0)
    if msg == null:
        continue
    // 处理消息
    handle(msg)

如果设置的超时时间太长,这个连接太久没有活跃过,可能会被 Redis Server 判定为无效连接,之后 Redis Server 会强制把这个客户端踢下线。所以,采用这种方案,客户端要有重连机制

缺点

  • 消费过后消息删除
    • 不支持多个消费者消费同一批数据。
    • 消费者拉取到消息后,发生异常宕机,遇到数据丢失。

发布/订阅模型:Pub/Sub

可以支持

  • 多个消费者消费同一批数据
  • 支持阻塞式拉取消息
  • 允许消费者根据一定规则,订阅多个自己感兴趣的队列

图片

图片

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 2个消费者 都订阅一个队列
127.0.0.1:6379> SUBSCRIBE queue
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "queue"
3) (integer) 1


启动一个生产者,发布一条消息
127.0.0.1:6379> PUBLISH queue msg1
(integer) 1

订阅符合规则的队列
127.0.0.1:6379> PSUBSCRIBE queue.*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "queue.*"
3) (integer) 1

这时再看消费者,它就可以接收到这 2 个生产者的消息了。

127.0.0.1:6379> PSUBSCRIBE queue.*
Reading messages... (press Ctrl-C to quit)
...
// 来自queue.p1的消息
1) "pmessage"
2) "queue.*"
3) "queue.p1"
4) "msg1"

// 来自queue.p2的消息
1) "pmessage"
2) "queue.*"
3) "queue.p2"
4) "msg2"

还是会丢数据

Pub/Sub 在实现时非常简单,它没有基于任何数据类型,也没有做任何的数据存储,它只是单纯地为生产者、消费者建立数据转发通道,把符合规则的数据,从一端转发到另一端。

场景

  • 消费者下线
    • 消费者订阅指定队列,Redis 就会记录一个映射关系:队列->消费者

    • 生产者向这个队列发布消息,那 Redis 就从映射关系中找出对应的消费者,把消息转发给它

    消费者下线了,那生产者发布的消息,因为找不到消费者,也会全部丢弃。所以消费者必须先订阅队列,生产者才能发布消息,否则消息会丢失。

  • Redis 宕机重启

    因为 Pub/Sub 没有基于任何数据类型实现,所以它也不具备数据持久化的能力,不会写入到 RDB 和 AOF 中

  • 消息积压(消费者的速度,跟不上生产者)

    采用 List 当作队列,消息积压时,会导致这个链表很长,最直接的影响就是,Redis 内存会持续增长,直到消费者把所有数据都从链表中取出

    Pub/Sub 的实现细节上来说。

    每个消费者订阅一个队列时,Redis 都会在 Server 上给这个消费者在分配一个内存缓冲区。当生产者发布消息时,Redis 先把消息写到对应消费者的缓冲区中。

    这个缓冲区其实是有上限的(可配置),如果消费者拉取消息很慢,就会造成生产者发布到缓冲区的消息开始积压,缓冲区内存持续增长。超过了缓冲区配置的上限,此时,Redis 就会强制把这个消费者踢下线。这时消费者就会消费失败,也会丢失数据。

    配置

    1
    client-output-buffer-limit pubsub 32mb 8mb 60。
    
    • 32mb:缓冲区一旦超过 32MB,Redis 直接强制把消费者踢下线
    • 8mb + 60:缓冲区超过 8MB,并且持续 60 秒,Redis 也会把消费者踢下线

stream

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// *表示让Redis自动生成消息ID ,消息 ID 的格式是「时间戳-自增序号」
127.0.0.1:6379> XADD queue * name zhangsan
"1618469123380-0"
127.0.0.1:6379> XADD queue * name lisi
"1618469127777-0"


// 从开头读取5条消息,0-0表示从开头读取
127.0.0.1:6379> XREAD COUNT 5 STREAMS queue 0-0
1) 1) "queue"
   2) 1) 1) "1618469123380-0"
         2) 1) "name"
            2) "zhangsan"
      2) 1) "1618469127777-0"
         2) 1) "name"
            2) "lisi"
            
            
// 想继续拉取消息,需要传入上一条消息的 ID
127.0.0.1:6379> XREAD COUNT 5 STREAMS queue 1618469127777-0
(nil)

图片

  • 支持阻塞等待拉取消息

    1
    2
    // BLOCK 0 表示阻塞等待,不设置超时时间
    127.0.0.1:6379> XREAD COUNT 5 BLOCK 0 STREAMS queue 1618469127777-0
    
  • 支持发布 / 订阅模式

    • XGROUP:创建消费者组
    • XREADGROUP:在指定消费组下,开启消费者拉取消息

      首先,生产者依旧发布 2 条消息:

    1
    2
    3
    4
    127.0.0.1:6379> XADD queue * name zhangsan
    "1618470740565-0"
    127.0.0.1:6379> XADD queue * name lisi
    "1618470743793-0"
    

    之后,我们想要开启 2 组消费者处理同一批数据,就需要创建 2 个消费者组:

    1
    2
    3
    4
    5
    6
    // 创建消费者组1,0-0表示从头拉取消息
    127.0.0.1:6379> XGROUP CREATE queue group1 0-0
    OK
    // 创建消费者组2,0-0表示从头拉取消息
    127.0.0.1:6379> XGROUP CREATE queue group2 0-0
    OK
    

    消费者组创建好之后,我们可以给每个消费者组下面挂一个消费者,让它们分别处理同一批数据。

    第一个消费组开始消费:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    // group1的consumer开始消费,>表示拉取最新数据
    127.0.0.1:6379> XREADGROUP GROUP group1 consumer COUNT 5 STREAMS queue >
    1) 1) "queue"
       2) 1) 1) "1618470740565-0"
             2) 1) "name"
                2) "zhangsan"
          2) 1) "1618470743793-0"
             2) 1) "name"
                2) "lisi"
    

    同样地,第二个消费组开始消费:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    // group2的consumer开始消费,>表示拉取最新数据
    127.0.0.1:6379> XREADGROUP GROUP group2 consumer COUNT 5 STREAMS queue >
    1) 1) "queue"
       2) 1) 1) "1618470740565-0"
             2) 1) "name"
                2) "zhangsan"
          2) 1) "1618470743793-0"
             2) 1) "name"
                2) "lisi"
    

    我们可以看到,这 2 组消费者,都可以获取同一批数据进行处理了。

  • 消费失败,可重新消费,消息不丢失

    1
    2
    // group1下的 1618472043089-0 消息已处理完成
    127.0.0.1:6379> XACK queue group1 1618472043089-0
    

    图片

    消费者异常宕机,肯定不会发送 XACK,那么 Redis 就会依旧保留这条消息

  • 实例宕机,消息不丢失,数据可持久化

    Stream 是新增加的数据类型,它与其它数据类型一样,每个写操作,也都会写入到 RDB 和 AOF 中。

    我们只需要配置好持久化策略,这样的话,就算 Redis 宕机重启,Stream 中的数据也可以从 RDB 或 AOF 中恢复回来。

  • 消息可堆积

    其实,当消息队列发生消息堆积时,一般只有 2 个解决方案:

    1. 生产者限流:避免消费者处理不及时,导致持续积压
    2. 丢弃消息:中间件丢弃旧消息,只保留固定长度的新消息

    而 Redis 在实现 Stream 时,采用了第 2 个方案。

    在发布消息时,你可以指定队列的最大长度,防止队列积压导致内存爆炸。

    1
    2
    3
    // 队列长度最大10000
    127.0.0.1:6379> XADD queue MAXLEN 10000 * name zhangsan
    "1618473015018-0"
    

    当队列长度超过上限后,旧消息会被删除,只保留固定长度的新消息。

    这么来看,Stream 在消息积压时,如果指定了最大长度,还是有可能丢失消息的。

专业消息队列

一个专业的消息队列,必须要做到两大块:

  1. 消息不丢
  2. 消息可堆积

把 Redis 当作队列来使用时,始终面临的 2 个问题:

  • Redis 本身可能会丢数据

  • 面对消息积压,Redis 内存资源紧张

数据丢失不敏感,而且消息积压概率比较小的情况下,把 Redis 当作队列是完全可以的

分析对比

一个消息队列分为三大块生产者、队列中间件、消费者

图片

生产者

  • 生产者问题,发送失败,重发一次
  • 数据已发送成功,响应结果超时。重新发一次,设定一个最大重试次数,超过上限依旧失败,需要记录日志报警处理。

对于敏感业务,当消费者收到重复数据数据时,要设计幂等逻辑,保证业务的正确性。

从这个角度来看,生产者会不会丢消息,取决于生产者对于异常情况的处理是否合理。

消费者

消费者拿到消息后,还没处理完成,就异常宕机了,消费者在处理完消息后,必须告知队列中间件,队列中间件才会把标记已处理

队列中间件

在这个方面,Redis 其实没有达到要求。

Redis 在以下 2 个场景下,都会导致数据丢失。

  1. AOF 持久化配置为每秒写盘,但这个写盘过程是异步的,Redis 宕机时会存在数据丢失的可能
  2. 主从复制也是异步的,主从切换时,也存在丢失数据的可能(从库还未同步完成主库发来的数据,就被提成主库)

Redis 本身的无法保证严格的数据完整性

消息积压

一旦发生消息积压,则会导致 Redis 的内存持续增长,如果超过机器内存上限,就会面临被 OOM 的风险。所以Stream 提供了可以指定队列最大长度的功能,就是为了避免这种情况发生。

但 Kafka、RabbitMQ 这类消息队列就不一样了,它们的数据都会存储在磁盘上,磁盘的成本要比内存小得多,当消息积压时,无非就是多占用一些磁盘空间,相比于内存,在面对积压时也会更加坦然。