尝试Redis发布-订阅模型

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

场景

我有程序功能大概类似于一个程序进行生产数据一个程序进行消费起初我考虑到了各种MQ去解决这件事情我们现有资源有Redis引入MQ可能会导致资源系统复杂性实时性的一个问题所以依然考虑使用Redis的发布-订阅模型来解决这问题。

发布-订阅模型

1. 订阅通道

订阅者使用SUBSCRIBE命令来订阅一个或多个通道。例如

SUBSCRIBE channel1 channel2

2. 取消订阅通道

订阅者可以使用UNSUBSCRIBE命令来取消订阅一个或多个通道。例如

UNSUBSCRIBE channel1 channel2

3. 接收消息

一旦订阅了通道订阅者将开始接收发布者发送到这些通道的消息。消息将以异步方式传递给订阅者。

4.发布消息

发布消费到通道发布到了之后订阅者就可用监听到这个消息了。

PUBLISH my_channel "Hello, subscribers!"

5.通配符的使用

Redis还支持通配符订阅让订阅者可以使用通配符来匹配多个通道。通配符有两种

匹配一个通道名例如 SUBSCRIBE news. 将订阅所有以 “news.” 开头的通道。

?匹配一个通道名中的一个字符例如 SUBSCRIBE news.?? 将订阅以 “news.” 开头且后面有两个字符的通道。

存在的问题

消息持久化和顺序问题

如果Redis挂壁了那么消息也会丢失的这个其实可用采用Redis Stream来解决这个问题。

发布者

我们采用c#语言来简单写一个demo

using StackExchange.Redis;
using System;

class Publisher
{
    static void Main()
    {
        // 建立到Redis服务器的连接
        ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("your_redis_connection_string");
        IDatabase db = redis.GetDatabase();

        string streamName = "mystream";

        for (int i = 1; i <= 10; i++)
        {
            string messageId = db.StreamAdd(streamName, new[]
            {
                new NameValueEntry("message", $"Message {i}")
            });

            Console.WriteLine($"Published: {messageId}");
        }

        // 关闭连接
        redis.Close();
    }
}

在上述示例中我们使用Redis的StreamAdd方法将消息发布到名为 “mystream” 的流中。每条消息都有一个唯一的消息ID消息将按照它们添加到流的顺序进行排序。

订阅者

using StackExchange.Redis;
using System;

class Subscriber
{
    static void Main()
    {
        // 建立到Redis服务器的连接
        ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("your_redis_connection_string");
        IDatabase db = redis.GetDatabase();

        string streamName = "mystream";
        string consumerGroup = "mygroup";
        string consumerName = "mysubscriber";

        // 创建消费者组
        db.StreamCreateConsumerGroup(streamName, consumerGroup, "0");

        while (true)
        {
            var messages = db.StreamReadGroup(streamName, consumerGroup, consumerName, "0", 10);

            foreach (var message in messages)
            {
                Console.WriteLine($"Received: {message.Values[1]}");
                // 在这里处理消息
                // 可以实现消息确认等逻辑
            }
        }

        // 关闭连接
        redis.Close();
    }
}

在上述示例中我们首先创建了一个消费者组然后循环读取来自流 “mystream” 的消息。通过使用消费者组我们可以确保每条消息只会被一个订阅者处理并且即使订阅者离线一段时间它也可以获取未处理的消息。

使用Redis Streams消息将持久化存储在Redis中即使Redis服务器重启消息也不会丢失。这使得Redis Streams成为处理消息的可靠工具适用于日志记录、事件处理和消息队列等应用。

安全性问题

Redis的消息其实是裸奔的。解决这个问题的核心在于可以在Redis上设置访问控制只允许授权的发布者和订阅者连接到Redis。此外可以考虑使用TLS/SSL来加密连接。

启用密码验证

首先可以配置Redis以限制访问只允许授权的客户端连接。在Redis的配置文件中通常是redis.conf可以使用以下配置项来启用访问控制

# 启用密码认证
requirepass your_password

也可以使用SSL加密协议来限制这个要取得加密证书。

无法保证消息可靠性

这个问题事实上是无解的接受不了就不要用有人可能说发布消息之后订阅者回电机制这个事实上是伪的不能再伪的逻辑我都收不到了我甜蜜的怎么回电还有人说存数据库我就笑笑。

结束

我没有转语言只是掌握的不只是一门语言一个nice的IT工作者应该是不被语言限制的

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: redis

“尝试Redis发布-订阅模型” 的相关文章