【快速掌握RabbitMQ到实战】

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

1.什么是消息队列

● 消息队列Message Queue是一种用于在应用程序之间传递消息的通信方式消息队列允许应用程序异步地发送和接收消息并且不需要直接连接到对方。
● 消息Message是指在应用间传送的数据。消息可以非常简单比如只包含文本字符串也可以更复杂可能包含嵌入对象
● 队列Queue可以说是一个数据结构可以存储数据。先进先出

2.消息队列有哪些作用和应用场景

2.1应用解耦
在这里插入图片描述
1. 传统模式下如果库存系统异常无法访问会导致下单失败或者随着公司业务拓展物流系统也需要接入下单信息此时订单系统还需要增加调用物流系统的接口
2. 引入消息队列用户下单后将消息写入消息队列返回下单成功库存和物流系统通过订阅下单消息获取下单信息库存系统根据下单信息进行库存扣减操作物流系统根据下单信息生成物流单即使此时库存系统无法访问但是不会影响下单流程。当库存系统恢复后可以正常消费消息

2.2异步提速


1. 传统模式下用户从注册到响应成功需要先保存注册信息再发送邮件通知邮件发送成功后再发送短息通知短息发送成功后才响应给用户用户体验不好
2. 引入MQ后保存用户信息后短信通知和邮件通知消息写入MQ(此过程耗时比较短)。极大的缩短了响应时间。增强用户体验

2.3流量削峰

在这里插入图片描述
3.认识一下RabbitMQ 加粗文本
一款基于AMQP(高级消息队列协议)用于软件之间通信的中间件由Rabbit科技有限公司开发服务器端用Erlang语言编写支持多种客户端如Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等。用于在分布式系统中存储转发消息在易用性、扩展性、高可用性等方面表现不俗。
RabbitMQ四大核心
● 生产者
● 消费者
● 队列
● 交换机
AMQP协议是一种二进制协议它定义了一组规则和标准以确保消息可以在不同的应用程序和平台之间传递和解释AMQP协议包含四个核心组件
● 消息
● 交换机
● 队列
● 绑定

4.RabbitMQ的安

六级标题4.1文件下载

RabbitMQ:https://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-23.3.4.11-1.el7.x86_64.rpm/
erlanghttps://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-23.3.4.11-1.el7.x86_64.rpm/download.rpm?distro_version_id=140

4.2安装文件

4.2.1 安装命令

// An highlighted block
var foo = 'bar';

生成一个适合你的列表

rpm -ivh erlang-23.3.4.11-1.el7.x86_64.rpm
yum install socat -y
rpm -ivh rabbitmq-server-3.8.16-1.el7.noarch.rpm
开机启动chkconfig rabbitmq-server on
启动服务/sbin/service rabbitmq-server start
查看服务状态/sbin/service rabbitmq-server status
停止服务(选择执行)/sbin/service rabbitmq-server stop
开启web管理插件先将服务关闭掉rabbitmq-plugins enable rabbitmq_management
访问地址http://192.168.16.128:15672/默认端口号15672如果遇到访问不了看看是否防火墙开着

关闭防火墙systemctl stop firewalld
开机关闭防火墙systemctl disable firewalld
查看防火墙状态systemctl status firewalld

5.RabbitMQ的工作原理

在这里插入图片描述
**Broker**接收和分发消息的应用RabbitMQ Server 就是 Message Broker
Virtual hostVirtual host是一个虚拟主机的概念一个Broker中可以有多个Virtual host每个Virtual host都有一套自己的Exchange和Queue同一个Virtual host中的Exchange和Queue不能重名不同的Virtual host中的Exchange和Queue名字可以一样。这样不同的用户在访问同一个RabbitMQ Broker时可以创建自己单独的Virtual host然后在自己的Virtual host中创建Exchange和Queue很好地做到了不同用户之间相互隔离的效果。

Connectionpublisher/consumer和borker之间的TCP连接

**Channel**发送消息的通道如果每一次访问 RabbitMQ 都建立一个 Connection在消息量大的时候建立 TCP Connection 的开销将是巨大的效率也较低。Channel 是在 connection 内部建立的逻辑连接如果应用程 序支持多线程通常每个 thread 创建单独的 channel 进行通讯AMQP method 包含了 channel id 帮助客 户端和 message broker 识别 channel所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销

Exchangemessage 到达 broker 的第一站根据分发规则匹配查询表中的 routing key分发 消息到 queue 中去。常用的类型有direct (point-to-point), topic (publish-subscribe) and fanout (multicast)

QueueQueue是一个用来存放消息的队列生产者发送的消息会被放到Queue中消费者消费消息时也是从Queue中取走消息。

Bindingexchange 和 queue 之间的虚拟连接binding 中可以包含 routing keyBinding 信息被保 存到 exchange 中的查询表中用于 message 的分发依据

6.如何实现生产者和消费者

6.1引入包依赖

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.16.0</version>
</dependency>

6.2生产者代码

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @Author 小川
 */
public class Producer {

    //队列名
    private static final String QUEUE_NAME = "hello-queue";
    //交换机
    private static final String EXCHANGE_NAME = "hello-exchange";

    public static void main(String[] args) throws Exception {
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //服务地址
        factory.setHost("192.168.16.128");
        //账号
        factory.setUsername("admin");
        //密码
        factory.setPassword("123456");
        //端口号
        factory.setPort(5672);
        //创建连接
        Connection connection = factory.newConnection();
        //创建信道
        Channel channel = connection.createChannel();

        /**
         * 声明和创建交换机
         * 1.交换机的名称
         * 2.交换机的类型direct、topic或者fanout和headers headers类型的交换器的性能很差不建议使用。
         * 3.指定交换机是否要持久化如果设置为true那么交换机的元数据要持久化到内存中
         * 4.指定交换机在没有队列与其绑定时是否删除设置为false表示不删除
         * 5.Map<String, Object>类型用来指定交换机其它一些结构化的参数我在这里直接设置为null。
         */
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, false, true, null);

        /**
         * 生成一个队列
         * 1.队列的名称
         * 2.队列是否要持久化但是需要注意这里的持久化只是队列名称等这些队列元数据的持久化不是队列中消息的持久化
         * 3.表示队列是不是私有的如果是私有的只有创建它的应用程序才能从队列消费消息
         * 4.队列在没有消费者订阅时是否自动删除
         * 5.队列的一些结构化信息比如声明死信队列、磁盘队列会用到。
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        /**
         * 将交换机和队列进行绑定
         * 1.队列名称
         * 2.交换机名称
         * 3.路由键在直连模式下为队列名称。
         */
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, QUEUE_NAME);

        /**
         * 发送消息
         * 1.发送到哪个交换机
         * 2.队列名称
         * 3.其他参数信息
         * 4.发送消息的消息体
         */
        String message = "hello world";
        channel.basicPublish(EXCHANGE_NAME, QUEUE_NAME, null, message.getBytes());
        System.out.println("消息发送成功");
    }
}

6.2生产者代码

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @Author 小川
 */
public class Producer {

    //队列名
    private static final String QUEUE_NAME = "hello-queue";
    //交换机
    private static final String EXCHANGE_NAME = "hello-exchange";

    public static void main(String[] args) throws Exception {
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //服务地址
        factory.setHost("192.168.16.128");
        //账号
        factory.setUsername("admin");
        //密码
        factory.setPassword("123456");
        //端口号
        factory.setPort(5672);
        //创建连接
        Connection connection = factory.newConnection();
        //创建信道
        Channel channel = connection.createChannel();

        /**
         * 声明和创建交换机
         * 1.交换机的名称
         * 2.交换机的类型direct、topic或者fanout和headers headers类型的交换器的性能很差不建议使用。
         * 3.指定交换机是否要持久化如果设置为true那么交换机的元数据要持久化到内存中
         * 4.指定交换机在没有队列与其绑定时是否删除设置为false表示不删除
         * 5.Map<String, Object>类型用来指定交换机其它一些结构化的参数我在这里直接设置为null。
         */
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, false, true, null);

        /**
         * 生成一个队列
         * 1.队列的名称
         * 2.队列是否要持久化但是需要注意这里的持久化只是队列名称等这些队列元数据的持久化不是队列中消息的持久化
         * 3.表示队列是不是私有的如果是私有的只有创建它的应用程序才能从队列消费消息
         * 4.队列在没有消费者订阅时是否自动删除
         * 5.队列的一些结构化信息比如声明死信队列、磁盘队列会用到。
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        /**
         * 将交换机和队列进行绑定
         * 1.队列名称
         * 2.交换机名称
         * 3.路由键在直连模式下为队列名称。
         */
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, QUEUE_NAME);

        /**
         * 发送消息
         * 1.发送到哪个交换机
         * 2.队列名称
         * 3.其他参数信息
         * 4.发送消息的消息体
         */
        String message = "hello world";
        channel.basicPublish(EXCHANGE_NAME, QUEUE_NAME, null, message.getBytes());
        System.out.println("消息发送成功");
    }
}

6.3消费者代码

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

/**
 * @Author 小川
 */
public class Consumer {

    //队列名
    private static final String QUEUE_NAME = "hello-queue";

    public static void main(String[] args) throws Exception {

        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //服务地址
        factory.setHost("192.168.16.128");
        //账号
        factory.setUsername("admin");
        //密码
        factory.setPassword("123456");
        //端口号
        factory.setPort(5672);
        //创建连接
        Connection connection = factory.newConnection();
        //创建信道
        Channel channel = connection.createChannel();

        //接受消息回调
        DeliverCallback deliverCallback = (consumerTag, message)-> {
            System.out.println(new String(message.getBody()));
        };

        //取消消息回调
        CancelCallback cancelCallback = consumerTag ->{
            System.out.println("消费消息被中断");
        };

        /**
         * 消费消息
         * 1.消费哪个队列
         * 2.消费成功之后是否要自动应答ture:自动应答
         * 3.消费者未成功消费的回调
         * 4.消费者取消消费的回调
         */
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);

    }
}

由于篇幅太长就不一一写出来的同时这个配有对应完整视频教程从开始安装到文件下载RabbitMQ底层原理交换机类型剖析RabbitMQ集群搭建配置镜像队列RabbitMQ负载均衡等等非常全面需要完整笔记和全套视频教程看文章末尾领取方式。

码字不易如果大家觉得文章不错有帮助的记得点赞支持哦谢谢大家

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: RabbitMQ