RabbitMQ

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

1.消息队列MQ

MQ全称为Message Queue消息队列是应用程序和应用程序之间的通信方法。

为什么使用MQ
在项目中可将一些无需即时返回且耗时的操作提取出来进行异步处理而这种异步处理的方式大大的节省了服务器的请求响应时间从而提高了系统的吞吐量。
开发中消息队列通常有如下应用场景
1、任务异步处理
将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。
2、应用程序解耦合
MQ相当于一个中介生产方通过MQ与消费方交互它将应用程序进行解耦合。

2.消息队列产品

市场上常见的消息队列有如下
ActiveMQ基于JMS
ZeroMQ基于C语言开发
RabbitMQ基于AMQP协议erlang语言开发稳定性好
RocketMQ基于JMS阿里巴巴产品
Kafka类似MQ的产品分布式消息系统高吞吐量

3.RabbitMQ简介

RabbitMQ是由erlang语言开发基于AMQPAdvanced Message Queue 高级消息队列协议协议实现的消息队列它是一种应用程序之间的通信方法消息队列在分布式系统开发中应用非常广泛。

RabbitMQ提供了6种模式简单模式work模式Publish/Subscribe发布与订阅模式Routing路由模式Topics主题模式RPC远程调用模式

3.1简单模式

过程生产者P发送消息到RabbitMQ的队列simple_queue消费者C可以从队列中获取消息。可以使用RabbitMQ的简单模式simple。

图示
在这里插入图片描述

P生产者也就是要发送消息的程序
C消费者消息的接受者会一直等待消息到来。
queue消息队列图中红色部分。类似一个邮箱可以缓存消息生产者向其中投递消息消费者从其中取出消息。

代码演示
生产者

/**
 * 简单模式发送消息
 */
public class Producer {
    static final String QUEUE_NAME = "simple_queue";

    public static void main(String[] args) throws Exception {
        //1. 创建连接工厂设置RabbitMQ的连接参数
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //主机默认localhost
        connectionFactory.setHost("localhost");
        //连接端口默认5672
        connectionFactory.setPort(5672);
        //虚拟主机默认/
        connectionFactory.setVirtualHost("/itcast");
        //用户名默认guest
        connectionFactory.setUsername("ceshi");
        //密码默认guest
        connectionFactory.setPassword("ceshi");

        //2. 创建连接
        Connection connection = connectionFactory.newConnection();
        //3. 创建频道
        Channel channel = connection.createChannel();
        //4. 声明队列
        /**
         * 参数1队列名称
         * 参数2是否定义持久化队列消息会持久化保存在服务器上
         * 参数3是否独占本连接
         * 参数4是否在不使用的时候队列自动删除
         * 参数5其它参数
         */
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        //5. 发送消息
        String message = "你好小兔纸。";

        /**
         * 参数1交换机名称如果没有则指定空字符串表示使用默认的交换机
         * 参数2路由key简单模式中可以使用队列名称
         * 参数3消息其它属性
         * 参数4消息内容
         */
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println("已发送消息" + message);
        //6. 关闭资源
        channel.close();
        connection.close();
    }
}

消费者

/**
 * 简单模式消费者接收消息
 */
public class Consumer {
    public static void main(String[] args) throws Exception {
        //1. 创建连接工厂
        //2. 创建连接抽取一个获取连接的工具类
        Connection connection = ConnectionUtil.getConnection();
        //3. 创建频道
        Channel channel = connection.createChannel();
        //4. 声明队列
        /**
         * 参数1队列名称
         * 参数2是否定义持久化队列消息会持久化保存在服务器上
         * 参数3是否独占本连接
         * 参数4是否在不使用的时候队列自动删除
         * 参数5其它参数
         */
        channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);
        //5. 创建消费者接收消息并处理消息
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由key
                System.out.println("路由key为" + envelope.getRoutingKey());
                //交换机
                System.out.println("交换机为" + envelope.getExchange());
                //消息id
                System.out.println("消息id为" + envelope.getDeliveryTag());
                //接收到的消息
                System.out.println("接收到的消息为" + new String(body, "utf-8"));
            }
        };
        //6. 监听队列
        /**
         * 参数1队列名
         * 参数2是否要自动确认设置为true表示消息接收到自动向MQ回复接收到了MQ则会将消息从队列中删除
         * 如果设置为false则需要手动确认
         * 参数3消费者
         */
        channel.basicConsume(Producer.QUEUE_NAME, true, defaultConsumer);
    }
}

3.2Work queues工作队列模式

说明Work Queues 与入门程序的 简单模式 相比多了一个或一些消费端多个消费端共同消费同一个队列中的消息。消费者之间对于同一个消息的关系是竞争的关系

应用场景对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度

在这里插入图片描述

代码 Work Queues 与入门程序的 简单模式 的代码是几乎一样的可以完全复制并复制多一个消费者进行多个消费者同时消费消息的测试。

3.3订阅模式类型

在这里插入图片描述

在订阅模型中多了一个exchange角色而且过程略有变化

P生产者也就是要发送消息的程序但是不再发送到队列中而是发给X交换机
C消费者消息的接受者会一直等待消息到来。
Queue消息队列接收消息、缓存消息。
Exchange交换机图中的X。一方面接收生产者发送的消息。另一方面知道如何处理消息例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作取决于Exchange的类型。

Exchange有常见以下3种类型
Fanout广播将消息交给所有绑定到交换机的队列
Direct定向把消息交给符合指定routing key 的队列
Topic通配符把消息交给符合routing pattern路由模式 的队列Exchange交换机只负责转发消息不具备存储消息的能力因此如果没有任何队列与Exchange绑定或者没有符合路由规则的队列那么消息会丢失

3.3.1 Publish/Subscribe发布与订阅模式

在这里插入图片描述

发布订阅模式 1、每个消费者监听自己的队列。 2、生产者将消息发给broker由交换机将消息转发到绑定此交换机的每个队列每个绑定交换机的队列都将接收到消息

发布与订阅使用的交换机类型为fanout

生产者
创建连接
创建频道
声明交换机fanout
声明队列
队列绑定到交换机
发送消息
关闭资源

消费者至少两个消费者
创建连接
创建频道
声明交换机
声明队列
队列绑定到交换机
创建消费者
监听队列

发布订阅模式与工作队列模式的区别
1、工作队列模式不用定义交换机而发布/订阅模式需要定义交换机。
2、发布/订阅模式的生产方是面向交换机发送消息工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)。
3、发布/订阅模式需要设置队列和交换机的绑定工作队列模式不需要设置实际上工作队列模式会将队列绑定到默认的交换机 。

3.3.2Routing路由模式

路由模式特点
队列与交换机的绑定不能是任意绑定了而是要指定一个 RoutingKey 路由key
消息的发送方在 向 Exchange发送消息时也必须指定消息的 RoutingKey 。
Exchange不再把消息交给每一个绑定的队列而是根据消息的 Routing Key 进行判断只有队列的Routingkey 与消息的 Routing key 完全一致才会接收到消息

在这里插入图片描述
图解
P生产者向Exchange发送消息发送消息时会指定一个routing key。
XExchange交换机接收生产者的消息然后把消息递交给 与routing key完全匹配的队列
C1消费者其所在队列指定了需要routing key 为 error 的消息
C2消费者其所在队列指定了需要routing key 为 info、error、warning 的消息

Routing模式要求队列在绑定交换机时要指定routing key消息会转发到符合routing key的队列。

3.3.3Topics通配符模式

Topics通配符模式可以根据路由key将消息传递到对应路由key的队列队列绑定到交换机的路由key可以有多个通配符模式中路由key可以使用 * 和 # 使用了通配符模式之后对于路由Key的配置更加灵活。

Routingkey 一般都是有一个或多个单词组成多个单词之间以”.”分割例如 item.insert
通配符规则

#匹配一个或多个词
*匹配不多不少恰好1个词

举例
item.# 能够匹配 item.insert.abc 或者 item.insert
item.* 只能匹配 item.insert

在这里插入图片描述
在这里插入图片描述
图解
红色Queue绑定的是 usa.# 因此凡是以 usa. 开头的 routing key 都会被匹配到
黄色Queue绑定的是 #.news 因此凡是以 .news 结尾的 routing key 都会被匹配

小结

不直接Exchange交换机默认交换机

simple简单模式一个生产者生产一个消息到一个队列被一个消费者接收
work工作队列模式生产者发送消息到一个队列中然后可以被多个消费者监听该队列一个消息只能被一个消费者接收消费者之间是竞争关系

使用Exchange交换机订阅模式交换机广播fanout、定向direct、通配符topic

发布与订阅模式使用了fanout广播类型的交换机可以将一个消息发送到所有绑定了该交换机的队列
路由模式使用了direct定向类型的交换机消费会携带路由key交换机根据消息的路由key与队列的路由key进行对比一致的话那么该队列可以接收到消息
通配符模式使用了topic通配符类型的交换机消费会携带路由key* #交换机根据消息的路由key与队列的路由key进行对比匹配的话那么该队列可以接收到消息

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: RabbitMQ