list
List 底层的实现就是一个「链表」,在头部和尾部操作元素,时间复杂度都是 O(1),这意味着它非常符合消息队列的模型。
生产者使用 LPUSH 发布消息
1 |
|
RPOP
RPOP
消费者使用循环会频繁拉取消息,这会造成CPU 空转,不仅浪费 CPU 资源,还会对Redis造成压力
1 |
|
又带来另外一个问题:当消费者在休眠等待时,有新消息来了,那消费者处理新消息就会存在「延迟」。
BPOP
及时处理新消息,还能避免 CPU 空转:提供了阻塞式拉取消息的命令:BRPOP / BLPOP
1 |
|
如果设置的超时时间太长,这个连接太久没有活跃过,可能会被 Redis Server 判定为无效连接,之后 Redis Server 会强制把这个客户端踢下线。所以,采用这种方案,客户端要有重连机制。
缺点
- 消费过后消息删除
- 不支持多个消费者消费同一批数据。
- 消费者拉取到消息后,发生异常宕机,遇到数据丢失。
发布/订阅模型:Pub/Sub
可以支持
- 多个消费者消费同一批数据
- 支持阻塞式拉取消息
- 允许消费者根据一定规则,订阅多个自己感兴趣的队列
1 |
|
还是会丢数据
Pub/Sub 在实现时非常简单,它没有基于任何数据类型,也没有做任何的数据存储,它只是单纯地为生产者、消费者建立数据转发通道,把符合规则的数据,从一端转发到另一端。
场景
- 消费者下线
-
消费者订阅指定队列,Redis 就会记录一个映射关系:队列->消费者
-
生产者向这个队列发布消息,那 Redis 就从映射关系中找出对应的消费者,把消息转发给它
消费者下线了,那生产者发布的消息,因为找不到消费者,也会全部丢弃。所以消费者必须先订阅队列,生产者才能发布消息,否则消息会丢失。
-
-
Redis 宕机重启
因为 Pub/Sub 没有基于任何数据类型实现,所以它也不具备数据持久化的能力,不会写入到 RDB 和 AOF 中
-
消息积压(消费者的速度,跟不上生产者)
采用 List 当作队列,消息积压时,会导致这个链表很长,最直接的影响就是,Redis 内存会持续增长,直到消费者把所有数据都从链表中取出
Pub/Sub 的实现细节上来说。
每个消费者订阅一个队列时,Redis 都会在 Server 上给这个消费者在分配一个内存缓冲区。当生产者发布消息时,Redis 先把消息写到对应消费者的缓冲区中。
这个缓冲区其实是有上限的(可配置),如果消费者拉取消息很慢,就会造成生产者发布到缓冲区的消息开始积压,缓冲区内存持续增长。超过了缓冲区配置的上限,此时,Redis 就会强制把这个消费者踢下线。这时消费者就会消费失败,也会丢失数据。
配置
1
client-output-buffer-limit pubsub 32mb 8mb 60。
- 32mb:缓冲区一旦超过 32MB,Redis 直接强制把消费者踢下线
- 8mb + 60:缓冲区超过 8MB,并且持续 60 秒,Redis 也会把消费者踢下线
stream
1 |
|
-
支持阻塞等待拉取消息
1
2// BLOCK 0 表示阻塞等待,不设置超时时间 127.0.0.1:6379> XREAD COUNT 5 BLOCK 0 STREAMS queue 1618469127777-0
-
支持发布 / 订阅模式
- XGROUP:创建消费者组
-
XREADGROUP:在指定消费组下,开启消费者拉取消息
首先,生产者依旧发布 2 条消息:
1
2
3
4127.0.0.1:6379> XADD queue * name zhangsan "1618470740565-0" 127.0.0.1:6379> XADD queue * name lisi "1618470743793-0"
之后,我们想要开启 2 组消费者处理同一批数据,就需要创建 2 个消费者组:
1
2
3
4
5
6// 创建消费者组1,0-0表示从头拉取消息 127.0.0.1:6379> XGROUP CREATE queue group1 0-0 OK // 创建消费者组2,0-0表示从头拉取消息 127.0.0.1:6379> XGROUP CREATE queue group2 0-0 OK
消费者组创建好之后,我们可以给每个消费者组下面挂一个消费者,让它们分别处理同一批数据。
第一个消费组开始消费:
1
2
3
4
5
6
7
8
9// group1的consumer开始消费,>表示拉取最新数据 127.0.0.1:6379> XREADGROUP GROUP group1 consumer COUNT 5 STREAMS queue > 1) 1) "queue" 2) 1) 1) "1618470740565-0" 2) 1) "name" 2) "zhangsan" 2) 1) "1618470743793-0" 2) 1) "name" 2) "lisi"
同样地,第二个消费组开始消费:
1
2
3
4
5
6
7
8
9// group2的consumer开始消费,>表示拉取最新数据 127.0.0.1:6379> XREADGROUP GROUP group2 consumer COUNT 5 STREAMS queue > 1) 1) "queue" 2) 1) 1) "1618470740565-0" 2) 1) "name" 2) "zhangsan" 2) 1) "1618470743793-0" 2) 1) "name" 2) "lisi"
我们可以看到,这 2 组消费者,都可以获取同一批数据进行处理了。
-
消费失败,可重新消费,消息不丢失
1
2// group1下的 1618472043089-0 消息已处理完成 127.0.0.1:6379> XACK queue group1 1618472043089-0
消费者异常宕机,肯定不会发送 XACK,那么 Redis 就会依旧保留这条消息
-
实例宕机,消息不丢失,数据可持久化
Stream 是新增加的数据类型,它与其它数据类型一样,每个写操作,也都会写入到 RDB 和 AOF 中。
我们只需要配置好持久化策略,这样的话,就算 Redis 宕机重启,Stream 中的数据也可以从 RDB 或 AOF 中恢复回来。
-
消息可堆积
其实,当消息队列发生消息堆积时,一般只有 2 个解决方案:
- 生产者限流:避免消费者处理不及时,导致持续积压
- 丢弃消息:中间件丢弃旧消息,只保留固定长度的新消息
而 Redis 在实现 Stream 时,采用了第 2 个方案。
在发布消息时,你可以指定队列的最大长度,防止队列积压导致内存爆炸。
1
2
3// 队列长度最大10000 127.0.0.1:6379> XADD queue MAXLEN 10000 * name zhangsan "1618473015018-0"
当队列长度超过上限后,旧消息会被删除,只保留固定长度的新消息。
这么来看,Stream 在消息积压时,如果指定了最大长度,还是有可能丢失消息的。
专业消息队列
一个专业的消息队列,必须要做到两大块:
- 消息不丢
- 消息可堆积
把 Redis 当作队列来使用时,始终面临的 2 个问题:
-
Redis 本身可能会丢数据
-
面对消息积压,Redis 内存资源紧张
数据丢失不敏感,而且消息积压概率比较小的情况下,把 Redis 当作队列是完全可以的
分析对比
一个消息队列分为三大块生产者、队列中间件、消费者。
生产者
- 生产者问题,发送失败,重发一次
- 数据已发送成功,响应结果超时。重新发一次,设定一个最大重试次数,超过上限依旧失败,需要记录日志报警处理。
对于敏感业务,当消费者收到重复数据数据时,要设计幂等逻辑,保证业务的正确性。
从这个角度来看,生产者会不会丢消息,取决于生产者对于异常情况的处理是否合理。
消费者
消费者拿到消息后,还没处理完成,就异常宕机了,消费者在处理完消息后,必须告知队列中间件,队列中间件才会把标记已处理
队列中间件
在这个方面,Redis 其实没有达到要求。
Redis 在以下 2 个场景下,都会导致数据丢失。
- AOF 持久化配置为每秒写盘,但这个写盘过程是异步的,Redis 宕机时会存在数据丢失的可能
- 主从复制也是异步的,主从切换时,也存在丢失数据的可能(从库还未同步完成主库发来的数据,就被提成主库)
Redis 本身的无法保证严格的数据完整性
消息积压
一旦发生消息积压,则会导致 Redis 的内存持续增长,如果超过机器内存上限,就会面临被 OOM 的风险。所以Stream 提供了可以指定队列最大长度的功能,就是为了避免这种情况发生。
但 Kafka、RabbitMQ 这类消息队列就不一样了,它们的数据都会存储在磁盘上,磁盘的成本要比内存小得多,当消息积压时,无非就是多占用一些磁盘空间,相比于内存,在面对积压时也会更加坦然。