RabbitMQ——RabbitMQ基础组件(生产者、消费者、队列、交换器、路由键、绑定、连接及信道)

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6


摘要

讲述RabbitMQ的入门知识,包括生产者、消费者、队列、交换器、路由键、绑定、连接及信道等基本术语。本章还阐述了RabbitMQ与AMQP 协议的对应关系。并详细的阐述其原源码。

RabbitMQ 整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。可以把消息传递的过程想象成:当你将一个包裹送到邮局,邮局会暂存并最终将邮件通过邮递员送到收件人的手上, RabbitMQ 就好比由邮局、邮箱和邮递员组成的一个系统。从计算机术语层面来说,RabbitMQ 模型更像是一种交换机模型。RabbitMQ 的整体模型架构如图所示。

RabbitMQ——RabbitMQ基础组件(生产者、消费者、队列、交换器、路由键、绑定、连接及信道)_TCP

RabbitMQ的生产者

Producer: 生产者,就是投递消息的一方。

生产者创建消息,然后发布到RabbitMQ 中。消息一般可以包含消息是包括消息体、属性和headers 。消息体也可以称之为payload ,在实际应用中,消息体一般是一个带有业务逻辑结构的数据,比如一个JSON 字符串。当然可以进一步对这个消息体进行序列化操作。消息的标签用来表述这条消息, 比如一个交换器的名称和一个路由键。生产者把消息交由RabbitMQ ,RabbitMQ 之后会根据标签把消息发送给感兴趣的消费者C(Consumer ) 。

RabbitMQ的消费者

Consumer: 消费者, 就是接收消息的一方。

消费者连接到RabbitMQ 服务器,并订阅到队列上。当消费者消费一条消息时, 只是消费消息的消息体C payload ) 。在消息路由的过程中, 消息的标签会丢弃, 存入到队列中的消息只有消息体,消费者也只会消费到消息体, 也就不知道消息的生产者是谁,当然消费者也不需要知道。

RabbitMQ的服务节点

Broker: 消息中间件的服务节点。

对于RabbitMQ 来说,一个RabbitMQ Broker 可以简单地看作一个RabbitMQ 服务节点,或者RabbitMQ 服务实例。大多数情况下也可以将一个RabbitMQ Broker 看作一台RabbitMQ服务器。图展示了生产者将消息存入RabbitMQ Broker ,以及消费者从Broker 中消费数据的整个流程。

RabbitMQ——RabbitMQ基础组件(生产者、消费者、队列、交换器、路由键、绑定、连接及信道)_TCP_02

首先生产者将业务方数据进行可能的包装, 之后封装成消息, 发送AMQP 协议里这个动作对应的命令为Basic.Publish) 到Broker 中。消费者订阅并接收消息AMQP 协议里这个动作对应的命令为Basic.Cosurne 或者Basic.Get) ,经过可能的解包处理得到原始的数据,之后再进行业务处理逻辑。这个业务处理逻辑并不一定需要和接收消息的逻辑使用同一个线程。消费者进程可以使用一个线程去接收消息,存入到内存中,比如使用Java 中的Bl oc ki呵Queue 。业务处理逻辑使用另一个线程从内存中读取数据,这样可以将应用进一步解稿,提高整个应用的处理效率。

RabbitMQ的队列

Queue: 队列,是RabbitMQ 的内部对象,用于存储消息。RabbitMQ 中消息都只能存储在队列中,这一点和Kafka 这种消息中间件相反。Katka 将消息存储在topic (主题)这个逻辑层面,而相对应的队列逻辑只是topic 实际存储文件中的位移标识。RabbitMQ 的生产者生产消息井最终技递到队列中,消费者可以从队列中获取消息并消费。多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(Round-Robin ,即轮询)给多个消费者进行处理,而不是每个消费者都收到所有的消息井处理。RabbitMQ 不支持队列层面的广播消费,如果需要广播消费,需要在其上进行二次开发,处理逻辑会变得异常复杂,同时也不建议这么做。

RabbitMQ——RabbitMQ基础组件(生产者、消费者、队列、交换器、路由键、绑定、连接及信道)_TCP_03

RabbitMQ的交换器

Exchange: 交换器。在图中我们暂时可以理解成生产者将消息投递到队列中,实际上这个在RabbitMQ 中不会发生。真实情况是,生产者将消息发送到Exchange (交换器,通常也可以用大写的"X" 来表示),由交换器将消息路由到一个或者多个队列中。如果路由不到,或许会返回给生产者,或许直接丢弃。这里可以将RabbitMQ 中的交换器看作一个简单的实体。RabbitMQ 中的交换器有四种类型,不同的类型有着不同的路由策略:

RabbitMQ——RabbitMQ基础组件(生产者、消费者、队列、交换器、路由键、绑定、连接及信道)_消息路由_04

直连交换机:Direct exchange

direct 类型的交换器路由规则也很简单,它会把消息路由到那些BindingKey 和RoutingKey
完全匹配的队列中。以图为例,交换器的类型为direct,如果我们发送一条消息,并在发送消息的时候设置路由键为" warning" ,则消息会路由到Queuel 和Queue2 ,对应的示例代码如下:

RabbitMQ——RabbitMQ基础组件(生产者、消费者、队列、交换器、路由键、绑定、连接及信道)_发送消息_05

扇形交换机:Fanout exchange

它会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中。

主题交换机:Topic exchange

前面讲到direct 类型的交换器路由规则是完全匹配BindingKey 和RoutingKey ,但是这种严格的匹配方式在很多情况下不能满足实际业务的需求。topic 类型的交换器在匹配规则上进行了扩展,它与direct 类型的交换器相似,也是将消息路由到BindingKey 和RoutingKey 相匹配的队列中,但这里的匹配规则有些不同,它

  • RoutingKey 为一个点号" "分隔的字符串(被点号" "分隔开的每一段独立的字符串称为一个单词)
  • BindingKey 和RoutingKey 一样也是点号" "分隔的字符串。
  • BindingKey 中可以存在两种特殊字符串"*"和"#",用于做模糊匹配,其中"#"用于匹配一个单词,吁"用于匹配多规格单词(可以是零个)。

RabbitMQ——RabbitMQ基础组件(生产者、消费者、队列、交换器、路由键、绑定、连接及信道)_消息路由_06

首部交换机:Headers exchange

headers 类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中
的headers 属性进行匹配。在绑定队列和交换器时制定一组键值对, 当发送消息到交换器时,
RabbitMQ 会获取到该消息的headers (也是一个键值对的形式) ,对比其中的键值对是否完全
匹配队列和交换器绑定时指定的键值对,如果完全匹配则消息会路由到该队列,否则不会路由
到该队列。headers 类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在。

RabbitMQ的绑定与连接

RoutingKey: 路由键。生产者将消息发给交换器的时候, 一般会指定一个RoutingKey ,用来指定这个消息的路由规则,而这个RoutingKey 需要与交换器类型和绑定键(BindingKey) 联合使用才能最终生效。在交换器类型和绑定键(BindingKey) 固定的情况下,生产者可以在发送消息给交换器时,通过指定RoutingKey 来决定消息流向哪里。

Binding: 绑定。RabbitMQ 中通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一个绑定键( BindingKey ) ,这样RabbitMQ 就知道如何正确地将消息路由到队列了。

RabbitMQ——RabbitMQ基础组件(生产者、消费者、队列、交换器、路由键、绑定、连接及信道)_发送消息_07

生产者将消息发送给交换器时, 需要一个RoutingKey , 当BindingKey 和RoutingKey 相匹配时, 消息会被路由到对应的队列中。在绑定多个队列到同一个交换器的时候, 这些绑定允许使用相同的BindingKey。BindingKey 并不是在所有的情况下都生效,它依赖于交换器类型, 比如fanout 类型的交换器就会无视BindingKey , 而是将消息路由到所有绑定到该交换器的队列中。

RabbitMQ的路由策略

RabbitMQ——RabbitMQ基础组件(生产者、消费者、队列、交换器、路由键、绑定、连接及信道)_消息路由_08

简单工作模式

RabbitMQ——RabbitMQ基础组件(生产者、消费者、队列、交换器、路由键、绑定、连接及信道)_发送消息_09

在该模型中只有一个生产者和一个消费者,同时生产者把消息放入到相应的队列中,消费者去对应的队列中拉取消息进行消费。

WorkQueue工作队列模式

RabbitMQ——RabbitMQ基础组件(生产者、消费者、队列、交换器、路由键、绑定、连接及信道)_rabbitmq_10

  • Work Queues:与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。
  • 应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
  • 在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。
  • Work Queues对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。例如:短信服务部署多个,只需要有一个节点成功发送即可。

代码实战在:RabbitMQ——RabbitMQ实战_庄小焱

Publish/Subscribe工作模式

RabbitMQ——RabbitMQ基础组件(生产者、消费者、队列、交换器、路由键、绑定、连接及信道)_TCP_11

在订阅模型中,多了一个Exchange 角色,而且过程略有变化:

  • P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
  • C:消费者,消息的接收者,会一直等待消息到来。
  • Queue:消息队列,接收消息、缓存消息

Exchange:交换机 (X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:

  • Fanout:广播,将消息交给所有绑定到交换机的队列
  • Direct:定向,把消息交给符合指定routing key的队列
  • Topic:通配符,把消息交给符合routing pattern(路由模式)的队列

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

 代码实战在:RabbitMQ——RabbitMQ实战_庄小焱

Routing工作模式

RabbitMQ——RabbitMQ基础组件(生产者、消费者、队列、交换器、路由键、绑定、连接及信道)_消息路由_12

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在向Exchange发送消息时,也必须指定消息的 RoutingKey
  • Exchange 不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey 与消息的 Routing key完全—致,才会接收到消息

 代码实战在:RabbitMQ——RabbitMQ实战_庄小焱

Topics工作模式

RabbitMQ——RabbitMQ基础组件(生产者、消费者、队列、交换器、路由键、绑定、连接及信道)_TCP_13

 代码实战在:RabbitMQ——RabbitMQ实战_庄小焱

Rabbit的connection和channel

我们又引入了两个新的概念: Connection 和Channel 。我们知道无论是生产者还是消费者,都需要和RabbitMQ Broker 建立连接,这个连接就是一条TCP 连接,也就是Connection 。一旦TCP 连接建立起来,客户端紧接着可以创建一个AMQP 信道(Channel) ,每个信道都会被指派一个唯一的D 。信道是建立在Connection 之上的虚拟连接, RabbitMQ 处理的每条AMQP 指令都是通过信道完成的。

RabbitMQ——RabbitMQ基础组件(生产者、消费者、队列、交换器、路由键、绑定、连接及信道)_TCP_14

我们完全可以直接使用Connection 就能完成信道的工作,为什么还要引入信道呢?试想这样一个场景, 一个应用程序中有很多个线程需要从RabbitMQ 中消费消息,或者生产消息,那么必然需要建立很多个Connection ,也就是许多个TCP 连接。然而对于操作系统而言,建立和销毁TCP 连接是非常昂贵的开销,如果遇到使用高峰,性能瓶颈也随之显现。RabbitMQ 采用类似NIO (Non-blocking 1/0) 的做法,选择TCP 连接复用,不仅可以减少性能开销,同时也便于管理。

每个线程把持一个信道,所以信道复用了Connection 的TCP 连接。同时RabbitMQ 可以确保每个线程的私密性,就像拥有独立的连接一样。当每个信道的流量不是很大时,复用单一的Connection 可以在产生性能瓶颈的情况下有效地节省TCP 连接资源。但是当信道本身的流量很大时,这时候多个信道复用一个Connection 就会产生性能瓶颈,进而使整体的流量被限制了。此时就需要开辟多个Connection ,将这些信道均摊到这些Connection 中,

 RabbitMQ的工作流程

生产者

(1)生产者连接到RabbitMQ Broker , 建立一个连接( Connection) ,开启一个信道(Channel)

(2) 生产者声明一个交换器,并设置相关属性,比如交换机类型、是否持久化等

(3)生产者声明一个队列井设置相关属性,比如是否排他、是否持久化、是否自动删除等

(4) 生产者通过路由键将交换器和队列绑定起来

(5) 生产者发送消息至RabbitMQ Broker,其中包含路由键、交换器等信息

(6) 相应的交换器根据接收到的路由键查找相匹配的队列。

(7) 如果找到,则将从生产者发送过来的消息存入相应的队列中。

(8) 如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者

(9) 关闭信道。

(10) 关闭连接。

消费者

(1)消费者连接到RabbitMQ Broker ,建立一个连接(Connection ) ,开启一个信道(Channel) 

(2) 消费者向RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数,
以及做一些准备工作

(3)等待RabbitMQ Broker 回应并投递相应队列中的消息, 消费者接收消息。

(4) 消费者确认( ack) 接收到的消息。

(5) RabbitMQ 从队列中删除相应己经被确认的消息。

(6) 关闭信道。

(7) 关闭连接。

RabbitMQ——RabbitMQ基础组件(生产者、消费者、队列、交换器、路由键、绑定、连接及信道)_TCP_15

Rabbit与AMQP的关系

RabbitMQ 就是AMQP协议的Erlang 的实现(当然RabbitMQ 还支持STOMP2 、MQTT3 等协议) 0 AMQP 的模型架构和RabbitMQ 的模型架构是一样的,生产者将消息发送给交换器,交换器和队列绑定。当生产者发送消息时所携带的RoutingKey 与绑定时的BindingKey 相匹配时,消息即被存入相应的队列之中。消费者可以订阅相应的队列来获取消息。AMQP 说到底还是一个通信协议,通信协议都会涉及报文交互,从low-level 举例来说,AMQP 本身是应用层的协议,其填充于TCP 协议层的数据部分。而从high-level 来说, AMQP是通过协议命令进行交互的。AMQP 协议可以看作一系列结构化命令的集合,这里的命令代表一种操作,类似于HTTP 中的方法(GET 、POST 、PUT 、DELETE 等) 。

RabbitMQ 中的交换器、交换器类型、队列、绑定、路由键等都是遵循的AMQP 协议中相应的概念。目前RabbitMQ 最新版本默认支持的是AMQP 0-9-1 。本书中如无特殊说明,都以AQMP 0-9-1 为基准进行介绍。

AMQP 协议本身包括三层。

  • Module Layer: 位于协议最高层,主要定义了一些供客户端调用的命令,客户端可以利用这些命令实现自己的业务逻辑。例如,客户端可以使用Queue . Declare 命令声明一个队列或者使用Basic.Consume 订阅消费一个队列中的消息。
  • Session Layer: 位于中间层,主要负责将客户端的命令发送给服务器,再将服务端的应答返回给客户端,主要为客户端与服务器之间的通信提供可靠性同步机制和错误处理。
  • Transport Layer: 位于最底层,主要传输二进制数据流,提供帧的处理、信道复用、错误检测和数据表示等。

AMQP 命令概览

RabbitMQ——RabbitMQ基础组件(生产者、消费者、队列、交换器、路由键、绑定、连接及信道)_rabbitmq_16

RabbitMQ——RabbitMQ基础组件(生产者、消费者、队列、交换器、路由键、绑定、连接及信道)_消息路由_17

RabbitMQ——RabbitMQ基础组件(生产者、消费者、队列、交换器、路由键、绑定、连接及信道)_发送消息_18

博文参考

RabbitMQ Tutorials — RabbitMQ  

链接:https://pan.baidu.com/s/11hydXsgJVwTkxYCKYcVgGA   提取码:gewe
 

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: RabbitMQ