rabbitmq+netcore6 【6】RPC:远程过程调用

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

文章目录


官网参考链接 https://www.rabbitmq.com/tutorials/tutorial-six-dotnet.html
其他人的翻译版参考 https://www.cnblogs.com/grayguo/p/5606886.html
以下工作是本人在参考官网的教程下结合自己的理解做的代码流程更深刻的理解还需要参考官网进行学习哦

1前言

Work Queues文章中 我们学习了如何使用Work Queues在多个消费者之间分发耗时任务。但是如果我们需要在远程电脑上运行一个方法并等待其执行结果呢这种模式就是我们一般讲的RPC(远程过程调用)类似客户端与服务端的发送请求的过程。

在这篇文章当中我们将会使用RabbitMQ构建一个简单的RPC系统一个客户端和一个可扩展的 RPC 服务器由于我们没有耗时任务需要分发因此我们创建一个假的RPC服务返回斐波那契数字。

2Client interface 客户接口

为了说明RPC服务是怎样被使用的我们创建一个了简单的Client类该类有一个Call的方法用来发送RPC请求然后阻塞直到请求结果的返回。

var rpcClient = new RPCClient();

Console.WriteLine(" [x] Requesting fib(30)");
var response = rpcClient.Call("30");
Console.WriteLine(" [.] Got '{0}'", response);

rpcClient.Close();

注意:
虽然RPC在计算处理中是一种非常常见的模型但是经常有非常多的争议当编程人员不注意一个方法调用是本地的还是较慢的RPC调用就会出现问题。这样的困惑会导致系统不可预测在调试时也增加了不必要的复杂性。错用RPC并不会简化软件而且可能导致一堆不可维护的屎山代码

请将以下建议记在心里:

  • 确保清楚哪个函数是本地的哪个函数是远程的。
  • 文档化你的系统确保组件之间的依赖更加清晰。
  • 处理异常场景当远程RPC服务长时间中断时客户端应该怎么处理。

当存在疑问时避免使用RPC而应该使用异步管道来代替RPC的功能–如阻塞可以将结果异步地带入到下一个计算阶段。

3Callback queue回调队列

通常情况下在RabbitMQ上进行RPC调用非常简单。客户端发起一个请求服务端响应一个消息。为了能够接收一个响应消息我们需将一个回调队列地址带着请求一起发送给服务器。

var props = channel.CreateBasicProperties();
props.ReplyTo = replyQueueName;

var messageBytes = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
                     routingKey: "rpc_queue",
                     basicProperties: props,
                     body: messageBytes);

Message properties
AMQP 0-9-1约定 预定义了14个属性可以随消息一起发送其中大多数的属性很少使用除了下面几个

  • Persistent为true时使消息持久化为其他值时随程序结束而消失
  • DeliveryMode:熟悉该协议的用户可以选择使用此属性而不是Persistent。他们控制着同一件事
  • contentType用来描述编码的的mime-type例如对于常用的 JSON 编码最好将此属性设置为application/json
  • replyTo通常被用来命名一个回调队列
  • correlationId用于将 RPC 响应与请求相关联

4Correlation Id 关联Id

在之前呈现的方法我们建议为每一个RPC请求都创建了一个回调队列但这是非常低效的更好的方式是为每一个客户端创建一个回调队列。这样就带来了一个新的问题当接收到一个结果的时候我们不知道该结果对应于哪个RPC请求这就是使用correlationId 属性的原因。我们将给每个RPC请求设定一个唯一值然后当我们从回调队列当中接收到消息的时可以根据该属性值将响应与请求匹配上。如果我们发现一个未知的correlationId值可以将其忽略掉因为它不属于我们的请求。

你可能会问为什么我们要忽略回调队列中的未知消息而不是产生一个error这是由于可能的竞争机制虽然这种情况是非常少见的但是存在这种可能RPC服务刚刚把计算结果放入回调队列就挂了但这时还没有来的及进行对Request进行Ack确认这种情况下重启的RPC服务器会把该条消息再处理一次。这就是为什么客户端需要平滑的处理重复的correlationId结果并且 RPC服务在理想情况下是幂等的。

5Summary总结

在这里插入图片描述
我们的RPC系统将会这样工作:

  • 当客户端启动的时候它会创建一个匿名的排他回调队列
  • 对于RPC请求客户端在发送消息上添加两个属性replyTo–用作回调队列correlationId–每一个请求的唯一值。
  • 请求被发送到rpc_queue 队列
  • RPC服务端等待rpc_queue 上面的请求当请求出现时它处理请求然后把结果发送到replyTo标示的回调队列上去。
  • 客户端在回调队列上等待结果当消息出现时它先会检查correlationId是否正确如果与请求中的值匹配成功就会将响应消息返回给应用程序。

6综合以上代码

准备工作

新建一个netcore6的控制台项目添加RabbitMQ的包依赖

NuGet\Install-Package RabbitMQ.Client -Version 6.4.0

在这里插入图片描述
新建一个类MainClass注释掉program.cs的代码使MainClass中的tatic void Main(string[] args)作为程序的入口
按照此方法新建2个netcore6的控制台项目分别代表服务端客户端。

其中斐波那契数列 函数递归写法

private static int fib(int n)
{
    if (n == 0 || n == 1) return n;
    return fib(n - 1) + fib(n - 2);
}

注此代码是在VisualStudio上运行的所以与官网代码略有不同。

运行逻辑
先运行服务端服务端会开始监听请求再运行客户端客户端会发送请求即数字30以及请求参数props包括请求的唯一标识props.CorrelationId自己生成的replyQueueName存入props.ReplyTo
在这里插入图片描述
与此同时服务端接收到了请求根据接收消息的ea.BasicProperties获取请求的相关参数props包括唯一标识props.CorrelationId以及props.ReplyTo计算f(30)props.ReplyTo作为的routingKey新的replyPropsreplyProps.CorrelationId=请求的props.CorrelationId作为basicPropertiesf(30)作为body进行发布。

channel.BasicPublish(exchange: "", routingKey: props.ReplyTo, basicProperties: replyProps, body: responseBytes); //将响应的消息发回客户端

客户端这边一直等待f(30)的返回如果收到了消息会根据ea.BasicProperties.CorrelationId判断是不是自己刚刚发送的请求如果是则会放入respQueue队列respQueue队列有值就会返回结果打印到控制台上。

服务端

  1. 像往常一样我们首先建立连接、通道并声明队列
  2. 我们可能希望运行多个服务器进程。为了将负载平均分布到多个服务器上我们需要在设置channel.basicQos时设置预取计数prefetchCount
  3. 我们使用 BasicConsume 来访问队列。然后我们注册一个传递处理程序在其中执行工作并将响应返回
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace RPCService
{
    public class MainClass
    {
        public static void Main()
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "rpc_queue", durable: false, exclusive: false, autoDelete: false, arguments: null); //声明队列
                channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); //负载均分

                var consumer = new EventingBasicConsumer(channel); //新建消费者接收客户端发过来的请求
                channel.BasicConsume(queue: "rpc_queue", autoAck: false, consumer: consumer);
                Console.WriteLine("[Service] Awaiting RPC requests");
                consumer.Received += (model, ea) =>
                {
                    string response = null;
                    int n = -1;
                    var body = ea.Body.ToArray();
                    var props = ea.BasicProperties; //获取请求的参数
                    var replyProps = channel.CreateBasicProperties(); //新建响应的参数
                    replyProps.CorrelationId = props.CorrelationId;
                    Console.WriteLine("[Service] Processing requests...");
                    try
                    {
                        var message = Encoding.UTF8.GetString(body);
                        n = int.Parse(message); //取出请求消息中的数字
                        Console.WriteLine("[Service] getting fib ({0})", message);
                        response = fib(n).ToString(); //计算fib(n)作为响应消息
                        Console.WriteLine("[Service]  fib ({0}) = {1}", message, response);
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine("[Service] " + ex.Message);
                        response = "";
                    }
                    finally
                    {
                        var responseBytes = Encoding.UTF8.GetBytes(response);
                        channel.BasicPublish(exchange: "", routingKey: props.ReplyTo, basicProperties: replyProps, body: responseBytes); //将响应的消息发回客户端
                        Console.WriteLine("[Service] Already sent fib({0})={1} to {2}", n, response, props.ReplyTo);
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); //回传确认消息表示服务端已收到
                    }
                };
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }

        /// 

        /// Assumes only valid positive integer input.
        /// Don't expect this one to work for big numbers, and it's
        /// probably the slowest recursive implementation possible.
        /// 

        private static int fib(int n)
        {
            if (n == 0 || n == 1)
            {
                return n;
            }

            return fib(n - 1) + fib(n - 2);
        }

    }
}

客户端

  1. 创建连接、通道然后创建一个用来响应的排他的回调消息队列
  2. 我们订阅这个回调的队列以便收到RPC的响应信息
  3. call方法发起真实的 RPC请求
  4. 我们先生成一个唯一的correlationId 并将其保存以便与它对应的响应在到达时能够被识别
  5. 接下来我们发布一个请求其中包括replyTo 和correlationId属性
  6. 这时我们可以等待直到对应的响应消息到达
  7. 对于每条响应消息客户端都会检查相关 ID 是否是我们正在寻找的 CorrelationId如果是我们需要保存这个结果
  8. 最后我们把响应结果返回给用户

新建MainClass类写入如下代码

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace PRCClient
{
    public class MainClass
    {
        static void Main()
        {
            var rpcClient = new RpcClient();

            Console.WriteLine(" [Client] Requesting fib(30)");
            var response = rpcClient.Call("30"); //f(30)

            Console.WriteLine(" [Client] Got '{0}'", response);
            rpcClient.Close();
        }
    }

}

新建RpcClient类写入如下代码

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace PRCClient
{
    public class RpcClient
    {
        private readonly IConnection connection;
        private readonly IModel channel;
        private readonly string replyQueueName;
        private readonly EventingBasicConsumer consumer;
        private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>();
        private readonly IBasicProperties props;

        public RpcClient() //构造方法初始化属性
        {
            var factory = new ConnectionFactory() { HostName = "localhost",UserName="lyh",Password="1211" };

            connection = factory.CreateConnection();
            channel = connection.CreateModel();

            //props随着请求一起发送给服务器
            replyQueueName = channel.QueueDeclare().QueueName; //回调队列名
            props = channel.CreateBasicProperties(); //生成基本属性
            var correlationId = Guid.NewGuid().ToString(); //生成唯一的correlationId
            props.CorrelationId = correlationId;  
            props.ReplyTo = replyQueueName; //将接收方设置为该回调队列名

            consumer = new EventingBasicConsumer(channel); //接收到服务端的响应值后进行消费
            consumer.Received += (model, ea) => //接收消息
            {
                var body = ea.Body.ToArray();
                var response = Encoding.UTF8.GetString(body);
                if (ea.BasicProperties.CorrelationId == correlationId) //判断响应消息的correlationId是否与自己发出的相同
                {
                    respQueue.Add(response);//相同则将返回值计入respQueue只要队列有值就会作为call的返回结果
                }
            };

            channel.BasicConsume(
                consumer: consumer,
                queue: replyQueueName,
                autoAck: true);
        }

        public string Call(string message)
        {
            var messageBytes = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish( //发送 30 到服务端
                exchange: "",
                routingKey: "rpc_queue",
                basicProperties: props,
                body: messageBytes);
            Console.WriteLine(" [Client] already sent '{0}' to Service", message);
            return respQueue.Take();
        }

        public void Close()
        {
            connection.Close();
        }
    }
}

结果验证

在这里插入图片描述
运行结果与准备工作的过程描述一致。

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: RabbitMQ