Redis 发布订阅模式的深度解析与实现消息队列

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

1 发布订阅模式(Pub/Sub)的概述

我们可以利用Redis的List数据结构实现一

个简单的消息队列通过lpush命令写入消息通过rpop 命令拉取消息也可以使用BRPOP实现阻塞式的拉取消息。

上面的消息队列有一个缺点那就是不支持消息多播机制消息多播机制就是生产者生产的一个消息可以被多个消费者消费到这个功能在分布式系统中非常重要。

Redis单独使用Pub/Sub模块来支持消息多播即发布/订阅模式(publish/subscribe)它是一种消息通信模式发布者(pub)发送消息订阅者(sub)接收消息。

发布者会将的消息发布到一个chanel通道中而不是发送给指定的订阅者发布者也不知道可能有哪些订阅者。

订阅者可以订阅一个或多个channel只接收来自订阅的channel的消息并且不知道有哪些如果有发布者这种模式实现了消息发布者和订阅者的解耦。

Pub/Sub 与键空间无关消息不会被持久化与数据库也无关在db10上发布将可以被 db1 上的订阅者听到。如果我们需要某种范围的范围那么只能在设置的channel名字上做区分。

2 订阅

客户端使用SUBSCRIBE channel [channel ...]命令订阅通道可以多次执行该命令也可以一次订阅多个通道多个客户端可以订阅相同的通道。

该命令返回一个数组包括三部分依次是:命令名称(字符串“subscribe”)订阅的通道名称目前总共订阅的通道数包含glob通道。这三个部分对每一个订阅的通道是连续的。

客户端执行订阅以后除了可以继续订阅(SUBSCRIBE或者PSUBSCRIBE),取消订阅(UNSUBSCRIBE或者PUNSUBSCRIBE), PING命令和结束连接(QUIT)外, 不能执行其他操作,客户端将阻塞直到订阅通道上发布消息的到来。

如下表示客户端一次性订阅四个通道aaa、bba、ccc、ddd

127.0.0.1:6379> SUBSCRIBE aaa bba ccc ddd
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "aaa"
3) (integer) 1
1) "subscribe"
2) "bba"
3) (integer) 2
1) "subscribe"
2) "ccc"
3) (integer) 3
1) "subscribe"
2) "ddd"
3) (integer) 4

请注意如果使用redis-cli 一旦进入订阅模式就不会接受任何命令只能使用 Ctrl-C 退出该模式。

3 取消订阅

客户端使用UNSUBSCRIBE [channel [channel ...]]命令取消订阅指定的通道可以指定一个或者多个取消的订阅通道名称也可以不带任何参数此时将取消所有的订阅的通道(不包括glob通道)。

该命令返回一个数组包括三部分依次是:命令名称(字符串“unsubscribe”)订阅的通道名称目前总共订阅的通道数包含glob通道。这三个部分对每一个取消订阅的通道是连续的。当最后一个参数为零时我们不再订阅任何频道客户端可以发出任何类型的 Redis 命令因为我们处于 Pub/Sub 状态之外。

如下表示客户端退出ccccc通道的订阅

127.0.0.1:6379> UNSUBSCRIBE ccccc
1) "unsubscribe"
2) "cccc"
3) (integer) 0

4 模式匹配

Redis Pub/Sub 实现支持模式匹配。客户端可以订阅 glob 通道这样就能接收发送到通道名称与给定模式匹配的通道的所有消息。

客户端使用PSUBSCRIBE pattern [pattern ...] 订阅一个或多个glob 通道。

该命令返回一个数组包括三部分依次是:命令名称(字符串“psubscribe”)订阅的glob通道名称目前总共订阅的通道数(包含非glob通道)。这三个部分对每一个订阅的通道是连续的。

例如订阅a*和*c模式

127.0.0.1:6379> PSUBSCRIBE a* *c
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "a*"
3) (integer) 1
1) "psubscribe"
2) "*c"
3) (integer) 2

客户端使用 PUNSUBSCRIBE [pattern [pattern ...]]退订一个或多个glob 通道。也可以不带任何参数此时将取消所有的订阅的通道(不包括非glob通道)。

该命令返回一个数组包括三部分依次是:命令名称(字符串“punsubscribe”)取消订阅的glob通道名称目前总共订阅的通道数包含非glob通道。这三个部分对每一个取消订阅的通道是连续的。当最后一个参数为零时我们不再订阅任何频道客户端可以发出任何类型的 Redis 命令因为我们处于 Pub/Sub 状态之外。

如下取消对a*的glob通道的订阅

127.0.0.1:6379> PUNSUBSCRIBE a*
1) "punsubscribe"
2) "a*"
3) (integer) 0

subscribe, unsubscribe, psubscribe 和punsubscribe命令的最后都返回当前客户端订阅的glob通道和通道的总数如果为0则客户端自动退出Pub/Sub模式。

5 发布

PUBLISH channel message命令在指定的通道上发布消息。只能在一个通道上发布消息不能在多个通道上同时发布消息。

将返回通知的接收者数量。这里的接收者数目大于等于订阅该通道的客户端数目因为一个客户端的glob通道和非glob通道同时匹配发布通道的话则视为两个接收者。换句话说如果客户端订阅了多个与已发布消息匹配的模式或者订阅了与该消息匹配的模式和通道则该客户端可能会多次收到同一条消息。

在接收端收到的响应包括三部分依次是“message”字符串匹配的通道名称发布的消息内容。如果是因为glob模式匹配而接收那么返回四部分“pmessage”字符串匹配的glob通道名称发送的原始通道名称发布的消息内容。

如果某个客户端的订阅a*和*c两个模式通道

127.0.0.1:6379> PSUBSCRIBE a* *c
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "a*"
3) (integer) 1
1) "psubscribe"
2) "*c"
3) (integer) 2

如果发送消息的通道为ac那么将会返回2

127.0.0.1:6379> PUBLISH ac xxxxx
(integer) 2

在客户端将收到两次消息

1) "pmessage"
2) "a*"
3) "ac"
4) "xxxxx"
1) "pmessage"
2) "*c"
3) "ac"
4) "xxxxx"

6 Pub/Sub原理

每个Redis服务器进程维持着一个标识服务器状态的redis.h/redisServer结构其中就保存着有订阅的频道 以及 订阅模式 的信息

struct redisServer {
    // ...
    dict *pubsub_channels;  // 订阅频道
    list *pubsub_patterns;  // 订阅模式
    // ...
};

6.1 pubsub_channels

pubsub_channels是一个dict字典结构key数组元素为channelvalue就是某个client。当客户端订阅某一个频道之后Redis 就会往 pubsub_channels 这个字典中新添加一条channel和client数据不同的client可以订阅相同的channelclient以链表的方式串联起来这样就能保存多个client对同一个channel的关系非常的巧妙。

了解了这个结构SUBSCRIBE 、PUBLISH 、UNSUBSCRIBE命令的实现也变得十分简单了。

SUBSCRIBE就是将channel和client加入到dict中如果此前没有该channel那就新增一个channel元素然后在再增一个client链表节点如果此前存在则直接在链表末尾添加一个client节点。

PUBLISH只需要通过上述字典定位到具体的channel就能找到所有订阅该channel的客户端再把消息发送给它们就好了。

UNSUBSCRIBE也很简单将对应channel下面的链表中的client删除即可。

6.2 pubsub_patterns

pubsub_patterns用于存储所有的glob channel它是一个list结构节点类型为redis.h/pubsubPattern

typedef struct pubsubPattern {
    redisClient *client;  // 订阅模式的客户端
    robj *pattern;        // 订阅的模式
} pubsubPattern;

当使用PSUBSCRIBE命令订阅一个模式时程序就创建一个pubsubPattern添加到 pubsub_patterns 链表中。如果另一个客户端也订阅一个模式则向链表的后面新增一个pubsubPattern节点即可。

因此实际上PUBLISH除了会在pubsub_channels中定位具体的channel之外还会将指定的channel与pubsub_patterns 中的模式进行对比如果 指定的channel 和某个模式匹配的话那么也将 message 发送到订阅那个模式的全部客户端。

PUNSUBSCRIBE的实现也很简单就是删除pubsub_patterns中client和pattern信息对比一致的节点。

7 Pub/Sub缺点

发布的消息在Redis系统中不能持久化因此必须先执行订阅再等待消息发布。如果先发布了消息那么该消息由于没有订阅者消息将被直接丢弃。

消息只管发送不管接收也没有ACK机制无法保证消息的消费成功。如果某个消费者中途加入进来或者挂掉重启那么这之前丢失的消息也不能再次消费。

以上的缺点导致Redis的Pub/Sub模式就像个小玩具在生产环境中几乎无用武之地非常的尴尬为此Redis5.0版本新增了Stream数据结构不但支持多播还支持数据持久化相比Pub/Sub更加的强大

8 相关文章

一、Redis简介、数据类型和命令

二、Redis数据类型介绍、使用场景及其操作命令

三、Redis事务的概述、设计与实现

四、Redis 发布订阅模式的深度解析与实现消息队列

五、Redis持久化RDB的三种触发机制及其优缺点

六、Redis主从复制与读写分离

七、Redis持久化RDB的三种触发机制及其优缺点

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: redis

“Redis 发布订阅模式的深度解析与实现消息队列” 的相关文章