RabbitMQ——RabbitMQ的Federation 和 Shovel原理

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6


摘要

主要讲述Federation和 Shovel这两个插件的使用、细节及相关原理。相对于集群的部署方式,Federation和Shovel可以部署在广域网中,为RabbitMQ提供更广泛的应用空间。RabbitMQ 可以通过3 种方式实现分布式部署:集群、Federation 和Shovel.这3 种方式不是互斥的,可以根据需要选择其中的一种或者以几种方式的组合来达到分布式部署的目的。Federation 和Shovel 可以为RabbitMQ 的分布式部署提供更高的灵活性,但同时也提高了部署的复杂性。

Federation exchange

Federation应用场景

(broker北京),(broker深圳)彼此之间相距甚远,网络延迟是一个不得不面对的问题。有一个在北京的业务(Client北京) 需要连接(broker北京),向其中的交换器exchangeA发送消息,此时的网络延迟很小,(Client北京)可以迅速将消息发送至exchangeA中,就算在开启了publisherconfirm机制或者事务机制的情况下,也可以迅速收到确认信息。此时又有个在深圳的业务(Client深圳)需要向exchangeA发送消息,那么(Client深圳) (broker北京)之间有很大的网络延迟,(Client深圳) 将发送消息至exchangeA会经历一定的延迟,尤其是在开启了publisherconfirm机制或者事务机制的情况下,(Client深圳) 会等待很长的延迟时间来接收(broker北京)的确认信息,进而必然造成这条发送线程的性能降低,甚至造成一定程度上的阻塞。将业务(Client深圳)部署到北京的机房可以解决这个问题,但是如果(Client深圳)调用的另些服务都部署在深圳,那么又会引发新的时延问题,总不见得将所有业务全部部署在一个机房,那么容灾又何以实现?这里使用Federation插件就可以很好地解决这个问题。

RabbitMQ——RabbitMQ的Federation 和 Shovel原理_队列queue

Federation提供了一个能力,让北京的rabbitmq接受exchanegA的消息。然后再把exchangeA的消息转发到广州的exchangeA。Federation 插件可以让多个交换器或者多个队列进行联邦。一个联邦交换器(federated exchange) 或者一个联邦队列C federated queue) 接收上游Cups位eam) 的消息,这里的上游是指位于其他Broker 上的交换器或者队列。联邦交换器能够将原本发送给上游交换器Cupstreamexchange) 的消息路由到本地的某个队列中;联邦队列则允许一个本地消费者接收到来自上游队列( upstream queue) 的消息。

RabbitMQ——RabbitMQ的Federation 和 Shovel原理_rabbitmq_02

Federation 插件的设计目标:

  • Federation 插件能够在不同管理域(可能设置了不同的用户和vhost ,也可能运行在不同版本的RabbitMQ 和Erlang上)中的Broker 或者集群之间传递消息。
  • Federation 插件基于AMQP 0-9-1 协议在不同的Broker 之间进行通信,并设计成能够容忍不稳定的网络连接情况。
  • 一个Broker 节点中可以同时存在联邦交换器(或队列)或者本地交换器(或队列),只需要对特定的交换器(或队列)创建Federation 连接CFederation link ) 。
  • Federation 不需要在N 个Broker 节点之间创建O(N2)个连接(尽管这是最简单的使用方式) ,这也就意味着Federation 在使用时更容易扩展。

Federation的原理

插件会在北京(broker1)上会建立一个同名的交换器exchangeA。同时建立一个内部的交换器exchangeA broker3,并通过路由键rkA将这两个交换器绑定起来。与此同时 Federation 插件还会在 brokerl 上建立一个队列federation: exchangeA.broker3 与交换器exchangeA.broker3进行绑定。Federation插件会在队列federation: exchangeA.broker3与 broker3中的交换器 exchangeA 之间建立一条 AMQP 连接来实时地消费队列federation: exchangeA.broker3中的数据。这些操作都是内部的,对外部业务客户端来说这条 Federation link 建立在brokerl exchangeA broker3 exchangeA之间。

RabbitMQ——RabbitMQ的Federation 和 Shovel原理_rabbitmq_03

Federation exchange的使用

安装rabbitmq实例

在机器1执行命令
docker run -d --hostname rabbit1 --net=host --name myrabbit1 rabbitmq:3.6.15-management

在机器2执行命令
docker run -d --hostname rabbit2 --net=host --name myrabbit2 rabbitmq:3.6.15-management

开启federation插件

在机器1执行命令进入容器

docker exec -it myrabbit1 /bin/bash

执行命令开启插件

rabbitmq-plugins enable rabbitmq_federation_management

在机器3执行命令进入容器

docker exec -it myrabbit3 /bin/bash

执行命令开启插件

rabbitmq-plugins enable rabbitmq_federation_management

访问ip:15672可以见到右边多了2栏


如果是在java代码中设置federation的方式

请使用的Http API的方式的来调用并设置的federation的开启和模式

PUT /api/policies/%2f/federate-me
{"pattern":"^amq\.", "definition":{"federation-upstream-set":"all"}, \
 "apply-to":"exchanges"}

RabbitMQ——RabbitMQ的Federation 和 Shovel原理_docker_04

 在机器3上面新建eujian.queue队列、eujian.exchange交换机、和他们之间的绑定。

这里使用命令行去新建(可以用管理后台自行新建)

rabbitmqctl eval 'rabbit_amqqueue:declare({resource, <<"/">>, queue, <<"eujian.queue">>}, true, false, [], none).'
rabbitmqctl eval 'rabbit_exchange:declare({resource, <<"/">>, exchange, <<"eujian.exchange">>}, fanout, true, false, false, []).'
rabbitmqctl eval 'rabbit_binding:add({binding, {resource, <<"/">>, exchange, <<"eujian.exchange">>}, <<"*">>, {resource, <<"/">>, queue, <<"eujian.queue">>}, []}).'

RabbitMQ——RabbitMQ的Federation 和 Shovel原理_队列queue_05

这里的192.168.2.138改成机器1的ip或者在管理台用页面新建

RabbitMQ——RabbitMQ的Federation 和 Shovel原理_rabbitmq_06

新增一个policy

这里是匹配以eujian. 开头的交换机在机器3执行命令

rabbitmqctl set_policy --apply-to exchanges p1 "eujian.*" '{"federation-upstream":"f1"}'

或者通过管理台新建

RabbitMQ——RabbitMQ的Federation 和 Shovel原理_docker_07

效果验证

这里点击federation status

RabbitMQ——RabbitMQ的Federation 和 Shovel原理_发送消息_08

进入机器1的rabbitmq管理台

RabbitMQ——RabbitMQ的Federation 和 Shovel原理_队列queue_09

RabbitMQ——RabbitMQ的Federation 和 Shovel原理_发送消息_10

 这里对机器1的exchange发送一条消息

RabbitMQ——RabbitMQ的Federation 和 Shovel原理_docker_11

然后在机器2的队列里面收到消息。

RabbitMQ——RabbitMQ的Federation 和 Shovel原理_发送消息_12

Federation Queue联邦队列

联邦队列可以在多个Broker节点(或者集群)之间为单个队列提供均衡负载的功能。一个联邦队列可以连接一个或者多个上游队列(upstream queue),并从这些上游队列中获取消息以满足本地消费者消费消息的需求。

RabbitMQ——RabbitMQ的Federation 和 Shovel原理_队列queue_13

队列queue1和queue2原本在broker2中,由于某种需求将其配置为federated queue并将broker1作为upstream。Federation插件会在broker1上创建同名的队列queue1和queue2,与broker2中的队列queue1和queue2分别建立两条单向独立的Federation link。当有消费者ClientA连接broker2并通过Basic.Consume消费队列queue1(或queue2)中的消息时,如果队列queue1(或queue2)中本身有若干消息堆积,那么ClientA直接消费这些消息,此时broker2中的queue1(或queue2)并不会拉取broker1中的queue1(或queue2)的消息;如果队列queue1(或queue2)中没有消息堆积或者消息被消费完了,那么它会通过Federation link拉取在broker1中的上游队列queue1(或queue2)中的消息(如果有消息),然后存储到本地,之后再被消费者ClientA进行消费。和federated exchange不同,一条消息可以在联邦队列间转发无限次。两个队列可以互为联邦队列。如果两个队列互为联邦队列,队列中的消息除了被消费,还会转向有多余消费能力的一方,如果这种"多余的消费能力"在broker1和broker2中来回切换,那么消费也会在broker1和broker2中的队列queue中来回转发。federation queue只能使用Basic.Consume进行消费,并且不具备传递性。

Federation Queue的使用

RabbitMQ——RabbitMQ的Federation 和 Shovel原理_队列queue_14

1. 原理图

2.添加upstream(同上)

3.添加policy

RabbitMQ——RabbitMQ的Federation 和 Shovel原理_docker_15

RabbitMQ的Shovel原理

Federation具备的数据转发功能类似,Shovel够可靠、持续地从一个Broker中的队列(作为源端,即source)拉取数据并转发至另一个Broker中的交换器(作为目的端,即destination)。作为源端的队列和作为目的端的交换器可以同时位于同一个Broker,也可以位于不同的Broker上。Shovel行为就像优秀的客户端应用程序能够负责连接源和目的地、负责消息的读写及负责连接失败问题的处理。shovelde优点:松耦合,解决不同Broker、集群、用户、vhost、MQ和Erlang版本移动消息,支持广域网,可以容忍糟糕的网络,能保证消息的可靠性,高度定制,当Shovel成功连接后,可以配置。

Shovel的工作原理

RabbitMQ——RabbitMQ的Federation 和 Shovel原理_docker_16

若在配置Shovel link时设置了add_forward_headers参数为true,则最后消息会有特殊headers属性标记。

RabbitMQ——RabbitMQ的Federation 和 Shovel原理_队列queue_17

通常源为队列,目的为交换器,但是,也可以源为队列,目的为队列。实际也是由交换器转发,只不过这个交换器是默认交换器。配置交换器做为源也是可行的。实际上会在源端自动新建一个队列,消息先存在这个队列,再被Shovel移走。源和目的端的交换器和队列都可以在建立Shovel之后再创建。Shovel可以为源端或目的端配置多个Broker地址,这样可以在一个连接失败后随机挑选其他地址建立连接。

RabbitMQ——RabbitMQ的Federation 和 Shovel原理_队列queue_18

RabbitMQ——RabbitMQ的Federation 和 Shovel原理_队列queue_19

Shovel的构建

1.开启插件(需要的机器都开启)

rabbitmq-plugins enable rabbitmq_shovel
rabbitmq-plugins enable rabbitmq_shovel_management

2.原理图(在源头发送的消息直接回进入到目的地队列)

3.添加shovel源和目的地

RabbitMQ——RabbitMQ的Federation 和 Shovel原理_队列queue_20

RabbitMQ——RabbitMQ的Federation 和 Shovel原理_rabbitmq_21

案例:消息堆积的治理 

单个队列中堆积10万条消息不会有丝毫影响,但超过1千万时,将引起严重问题,如内存、磁盘告警、connection阻塞等。

可以使用Shovel,当某个队列消息堆积严重时,可以使用Shovel将队列中消息移交给另一个集群

1、当检测到集群cluster1中队列queue1中有严重消息堆积,如通过/api/queues/vhost/name获取消息个数messages超过2千万或者消息占用大小messages_bytes超过10GB时,启用shovel1将queue1中消息转发至备份集群cluster2中队列queue2
2、当检测到queue1中消息个数低于1百万或者消息大小低于1GB时,停止shovel1,让原本队列消化剩余堆积
3、当检测到queue1消息个数低于10万个或占用大小低于100MB时,开启shovel2将queue2队列中暂存的消息返还队列queue1
4、当检测到queue1消息个数超过1百万,或消息占用高于1GB时,将shovel2停掉。

RabbitMQ——RabbitMQ的Federation 和 Shovel原理_docker_22

博文参考

尚硅谷2021新版RabbitMQ教程丨快速掌握MQ消息中间件_哔哩哔哩_bilibili

rabbitmq federation的联邦交换机 - 简书

RabbitMQ的Federation和Shovel的使用_Coding Farmer的博客

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: RabbitMQ