【RabbitMQ】基础篇,学习纪录+笔记

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

目录

一.介绍

1.1MQ概述

1.2MQ优势和劣势

1.3常见的 MQ 产品

1.4RabbitMQ简介

1.5RabbitMQ中的相关概念

1.6RabbitMQ的安装

二.快速入门

2.1入门程序

2.2工作模式

2.2.1Work queues 工作队列模式

2.2.2Pub/Sub 订阅模式

2.2.3Routing 路由模式

2.2.4Topics 通配符模式

三.整合SpringBoot

3.1生产端

3.2消费端

3.3总结


一.介绍


1.1MQ概述


MQ全称Message Queue (消息队列)是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。
MQ,消息队列,存储消息的中间件
分布式系统通信两种方式直接远程调用和借助第三方完成间接通信
发送方称为生产者接收方称为消费者

1.2MQ优势和劣势


优势
应用解耦
异步提速
削峰填谷

劣势
系统可用性降低
系统复杂度提高
一致性问题

1.3常见的 MQ 产品

1.4RabbitMQ简介

 AMQP即 Advanced Message Queuing Protocol高级消息队列协议是一个网络协议是应用层协议 的一个开放标准为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息并不受客户端/中间件不同产品不同的开发语言等条件的限制。2006年AMQP 规范发布。类比HTTP。

RabbitMQ基础架构图

 

1.5RabbitMQ中的相关概念

Broker接收和分发消息的应用RabbitMQ Server就是 Message Broker

Virtual host出于多租户和安全因素设计的把 AMQP 的基本组件划分到一个虚拟的分组中类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时可以划分出多 个vhost每个用户在自己的 vhost 创建 exchange/queue 等

Connectionpublisher/consumer 和 broker 之间的 TCP 连接

Channel如果每一次访问 RabbitMQ 都建立一个 Connection在消息量大的时候建立 TCP Connection 的开销将是巨大的效率也较低。Channel 是在 connection 内部建立的逻辑连接如果应用程序支持多线 程通常每个thread创建单独的 channel 进行通讯AMQP method 包含了channel id 帮助客户端和 message broker 识别 channel所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销

Exchangemessage 到达 broker 的第一站根据分发规则匹配查询表中的 routing key分发消息到 queue 中去。常用的类型有direct (point-to-point), topic (publish-subscribe) and fanout (multicast)

Queue消息最终被送到这里等待 consumer 取走

Bindingexchange 和 queue 之间的虚拟连接binding 中可以包含 routing key。Binding 信息被保存 到 exchange 中的查询表中用于 message 的分发依据

1.6RabbitMQ的安装

楼主用的是Docker进行安装

拉取镜像

docker pull rabbitmq

运行镜像

docker run -d --name rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management

参数说明

-d表示在后台运行容器
-p将容器的端口 5672应用访问端口和 15672 控制台Web端口号映射到主机中
-e指定环境变量
RABBITMQ_DEFAULT_VHOST默认虚拟机名
RABBITMQ_DEFAULT_USER默认的用户名
RABBITMQ_DEFAULT_PASS默认的用户密码
--name rabbitmq设置容器名称
rabbitmq容器使用的镜像名称

二.快速入门

2.1入门程序

需求使用简单模式完成消息传递

步骤

① 创建工程生成者、消费者在IDEA中创建两个Maven模块

② 分别添加依赖

<dependencies>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>5.1.7.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>2.1.8.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>5.1.7.RELEASE</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

③ 编写生产者发送消息

public class Producer_WorkQueues {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        //2.设置参数
        factory.setHost("ip"); //ip 默认localhost
        factory.setPort(5672); //端口 默认5672
        factory.setVirtualHost("/itcast");//虚拟机 默认/
        factory.setUsername("admin");//用户名
        factory.setPassword("admin");//密码

        //3.创建Connection
        Connection connection = factory.newConnection();

        //4.创建Channel
        Channel channel = connection.createChannel();

        //5.创建队列Queue
        // String queue,队列名称
        // boolean durable,是否持久化
        // boolean exclusive,是否独占。只能有一个消费者监听队列当connection关闭时是否删除队列
        // boolean autoDelete,是否自动删除当没有Consumer时会自动删除
        // Map<String, Object> arguments
        // 如果没有一个名字叫hello_world的队列则自动创建有就不创建
        channel.queueDeclare("work_queues",true,false,false,null);

        //6.发送消息
        // String exchange,交换机名称简单模式使用默认""
        // String routingKey,路由名称
        // AMQP.BasicProperties props,配置信息
        // byte[] body,发送的消息数据
        for (int i = 1; i <= 10; i++) {
            String body = i+" ---> hello rabbitmq!";
            channel.basicPublish("","work_queues",null,body.getBytes());
        }


        //7.释放资源
        channel.close();
        connection.close();

    }
}

④ 编写消费者接收消息

public class Consumer_HelloWorld {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        //2.设置参数
        factory.setHost("ip"); //ip 默认localhost
        factory.setPort(5672); //端口 默认5672
        factory.setVirtualHost("/itcast");//虚拟机 默认/
        factory.setUsername("admin");//用户名
        factory.setPassword("admin");//密码

        //3.创建Connection
        Connection connection = factory.newConnection();

        //4.创建Channel
        Channel channel = connection.createChannel();

        //5.创建队列Queue
        // String queue,队列名称
        // boolean durable,是否持久化
        // boolean exclusive,是否独占。只能有一个消费者监听队列当connection关闭时是否删除队列
        // boolean autoDelete,是否自动删除当没有Consumer时会自动删除
        // Map<String, Object> arguments
        // 如果没有一个名字叫hello_world的队列则自动创建没有就不创建
        channel.queueDeclare("hello_world",true,false,false,null);

        //6.接收消息
        // String queue,队列名称
        // boolean autoAck,是否自动确认
        // Consumer callback,回调对象
        Consumer consumer = new DefaultConsumer(channel){
            //回调方法
            // String consumerTag,标识
            // Envelope envelope,获取一些信息交换机、路由key……
            // AMQP.BasicProperties properties,配置信息
            // byte[] body消息
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("counsumerTag:"+consumerTag);
                System.out.println("Exchange:"+envelope.getExchange());
                System.out.println("RoutingKey:"+envelope.getRoutingKey());
                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body));

            }
        };

        channel.basicConsume("hello_world",true,consumer);

    }
}

2.2工作模式

2.2.1Work queues 工作队列模式

Work Queues与入门程序的简单模式相比多了一个或一些消费端多个消费端共同消费同一个队列中的消息。

应用场景对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

 

生产者

public class Producer_WorkQueues {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        //2.设置参数
        factory.setHost("ip"); //ip 默认localhost
        factory.setPort(5672); //端口 默认5672
        factory.setVirtualHost("/itcast");//虚拟机 默认/
        factory.setUsername("admin");//用户名
        factory.setPassword("admin");//密码

        //3.创建Connection
        Connection connection = factory.newConnection();

        //4.创建Channel
        Channel channel = connection.createChannel();

        //5.创建队列Queue
        // String queue,队列名称
        // boolean durable,是否持久化
        // boolean exclusive,是否独占。只能有一个消费者监听队列当connection关闭时是否删除队列
        // boolean autoDelete,是否自动删除当没有Consumer时会自动删除
        // Map<String, Object> arguments
        // 如果没有一个名字叫hello_world的队列则自动创建有就不创建
        channel.queueDeclare("work_queues",true,false,false,null);

        //6.发送消息
        // String exchange,交换机名称简单模式使用默认""
        // String routingKey,路由名称
        // AMQP.BasicProperties props,配置信息
        // byte[] body,发送的消息数据
        for (int i = 1; i <= 10; i++) {
            String body = i+" ---> hello rabbitmq!";
            channel.basicPublish("","work_queues",null,body.getBytes());
        }


        //7.释放资源
        channel.close();
        connection.close();

    }
}

消费者1

public class Consumer_WorkQueues {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        //2.设置参数
        factory.setHost("ip"); //ip 默认localhost
        factory.setPort(5672); //端口 默认5672
        factory.setVirtualHost("/itcast");//虚拟机 默认/
        factory.setUsername("admin");//用户名
        factory.setPassword("admin");//密码

        //3.创建Connection
        Connection connection = factory.newConnection();

        //4.创建Channel
        Channel channel = connection.createChannel();

        //5.创建队列Queue
        // String queue,队列名称
        // boolean durable,是否持久化
        // boolean exclusive,是否独占。只能有一个消费者监听队列当connection关闭时是否删除队列
        // boolean autoDelete,是否自动删除当没有Consumer时会自动删除
        // Map<String, Object> arguments
        // 如果没有一个名字叫hello_world的队列则自动创建没有就不创建
        channel.queueDeclare("work_queues",true,false,false,null);

        //6.接收消息
        // String queue,队列名称
        // boolean autoAck,是否自动确认
        // Consumer callback,回调对象
        Consumer consumer = new DefaultConsumer(channel){
            //回调方法
            // String consumerTag,标识
            // Envelope envelope,获取一些信息交换机、路由key……
            // AMQP.BasicProperties properties,配置信息
            // byte[] body消息
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                System.out.println("counsumerTag:"+consumerTag);
//                System.out.println("Exchange:"+envelope.getExchange());
//                System.out.println("RoutingKey:"+envelope.getRoutingKey());
//                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body));

            }
        };

        channel.basicConsume("work_queues",true,consumer);

    }
}

消费者2和消费者1一样

小结

①在一个队列中如果有多个消费者那么消费者之间对于同一个消息的关系是竞争的关系。

②Work Queues 对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。例如短信服务部署多个 只需要有一个节点成功发送即可。

2.2.2Pub/Sub 订阅模式

在订阅模型中多了一个 Exchange 角色而且过程略有变化

P生产者也就是要发送消息的程序但是不再发送到队列中而是发给X交换机

C消费者消息的接收者会一直等待消息到来

Queue消息队列接收消息、缓存消息

Exchange交换机X。一方面接收生产者发送的消息。另一方面知道如何处理消息例如递交给某个特别队列、 递交给所有队列、或是将消息丢弃。到底如何操作取决于Exchange的类型。

Exchange有常见以下3种类型

Fanout广播将消息交给所有绑定到交换机的队列 

Direct定向把消息交给符合指定routing key 的队列  

Topic通配符把消息交给符合routing pattern路由模式 的队列 Exchange交换机只负责转发消息不具备存储消息的能力因此如果没有任何队列与 Exchange 绑定或者没有符合 路由规则的队列那么消息会丢失

生产者

public class Producer_PubSub {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        //2.设置参数
        factory.setHost("ip"); //ip 默认localhost
        factory.setPort(5672); //端口 默认5672
        factory.setVirtualHost("/itcast");//虚拟机 默认/
        factory.setUsername("admin");//用户名
        factory.setPassword("admin");//密码

        //3.创建Connection
        Connection connection = factory.newConnection();

        //4.创建Channel
        Channel channel = connection.createChannel();

        //5.创建交换机
        // String exchange,交换机名称
        // BuiltinExchangeType type,交换机的类型,4种类型
        //   DIRECT("direct"),定向
        //   FANOUT("fanout"),扇形广播  发送消息到每一个与之绑定的队列
        //   TOPIC("topic"),通配符的方式
        //   HEADERS("headers"),不知道

        // boolean durable,是否持久化
        // boolean autoDelete,是否自动删除
        // boolean internal,内部使用,一般是false
        // Map<String, Object> arguments,参数

        String exchangeName = "test_fanout";

        channel.exchangeDeclare(exchangeName,BuiltinExchangeType.FANOUT,true,false,false,null);

        //6.创建队列
        String queue1Name = "test_fanout_queue1";
        String queue2Name = "test_fanout_queue2";
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);

        //7.绑定交换机和队列
        // String queue,队列名称
        // String exchange,交换机名称
        // String routingKey,路由名称,如果交换机类型为fanout,routingkey设置为空("")
        channel.queueBind(queue1Name,exchangeName,"");
        channel.queueBind(queue2Name,exchangeName,"");

        //8.发送消息
        String body = "日志信息张三去调用了findAll方法……日志级别info...";
        channel.basicPublish(exchangeName,"",null,body.getBytes());

        //9.释放资源
        channel.close();
        connection.close();


    }
}

消费者1

public class Consumer_PubSub1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        //2.设置参数
        factory.setHost("ip"); //ip 默认localhost
        factory.setPort(5672); //端口 默认5672
        factory.setVirtualHost("/itcast");//虚拟机 默认/
        factory.setUsername("admin");//用户名
        factory.setPassword("admin");//密码

        //3.创建Connection
        Connection connection = factory.newConnection();

        //4.创建Channel
        Channel channel = connection.createChannel();

        String queue1Name = "test_fanout_queue1";
        String queue2Name = "test_fanout_queue2";

        //6.接收消息
        // String queue,队列名称
        // boolean autoAck,是否自动确认
        // Consumer callback,回调对象
        Consumer consumer = new DefaultConsumer(channel){
            //回调方法
            // String consumerTag,标识
            // Envelope envelope,获取一些信息交换机、路由key……
            // AMQP.BasicProperties properties,配置信息
            // byte[] body消息
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                System.out.println("counsumerTag:"+consumerTag);
//                System.out.println("Exchange:"+envelope.getExchange());
//                System.out.println("RoutingKey:"+envelope.getRoutingKey());
//                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body));
                System.out.println("将日志信息打印到控制台……");

            }
        };

        channel.basicConsume(queue1Name,true,consumer);

    }
}

消费者2即把最后一行代码改为channel.basicConsume(queue2Name,true,consumer);

回调方法可以修改用作差别对待

小结

1. 交换机需要与队列进行绑定绑定之后一个消息可以被多个消费者都收到。

2. 发布订阅模式与工作队列模式的区别

①工作队列模式不用定义交换机而发布/订阅模式需要定义交换机

②发布/订阅模式的生产方是面向交换机发送消息工作队列模式的生产方是面向队列发送消息(底层使用 默认交换机)

③发布/订阅模式需要设置队列和交换机的绑定工作队列模式不需要设置实际上工作队列模式会将队列绑定到默认的交换机

2.2.3Routing 路由模式

队列与交换机的绑定不能是任意绑定了而是要指定一个 RoutingKey路由key

消息的发送方在向 Exchange 发送消息时也必须指定消息的 RoutingKey

 Exchange 不再把消息交给每一个绑定的队列而是根据消息的 Routing Key 进行判断只有队列的 Routingkey 与消息的 Routing key 完全一致才会接收到消息

 

生产者

public class Producer_Routing {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        //2.设置参数
        factory.setHost("ip"); //ip 默认localhost
        factory.setPort(5672); //端口 默认5672
        factory.setVirtualHost("/itcast");//虚拟机 默认/
        factory.setUsername("admin");//用户名
        factory.setPassword("admin");//密码

        //3.创建Connection
        Connection connection = factory.newConnection();

        //4.创建Channel
        Channel channel = connection.createChannel();

        //5.创建交换机
        // String exchange,交换机名称
        // BuiltinExchangeType type,交换机的类型,4种类型
        //   DIRECT("direct"),定向
        //   FANOUT("fanout"),扇形广播  发送消息到每一个与之绑定的队列
        //   TOPIC("topic"),通配符的方式
        //   HEADERS("headers"),不知道

        // boolean durable,是否持久化
        // boolean autoDelete,是否自动删除
        // boolean internal,内部使用,一般是false
        // Map<String, Object> arguments,参数

        String exchangeName = "test_direct";

        channel.exchangeDeclare(exchangeName,BuiltinExchangeType.DIRECT,true,false,false,null);

        //6.创建队列
        String queue1Name = "test_direct_queue1";
        String queue2Name = "test_direct_queue2";
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);

        //7.绑定交换机和队列
        // String queue,队列名称
        // String exchange,交换机名称
        // String routingKey,路由名称,如果交换机类型为fanout,routingkey设置为空("")

        //队列1绑定error
        channel.queueBind(queue1Name,exchangeName,"error");

        //队列2绑定info error warning
        channel.queueBind(queue2Name,exchangeName,"info");
        channel.queueBind(queue2Name,exchangeName,"error");
        channel.queueBind(queue2Name,exchangeName,"warning");


        //8.发送消息
        String body = "日志信息张三去调用了findAll方法……日志级别info...";
        channel.basicPublish(exchangeName,"info",null,body.getBytes());

        //9.释放资源
        channel.close();
        connection.close();


    }
}

消费者1

public class Consumer_Routing1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        //2.设置参数
        factory.setHost("ip"); //ip 默认localhost
        factory.setPort(5672); //端口 默认5672
        factory.setVirtualHost("/itcast");//虚拟机 默认/
        factory.setUsername("admin");//用户名
        factory.setPassword("admin");//密码

        //3.创建Connection
        Connection connection = factory.newConnection();

        //4.创建Channel
        Channel channel = connection.createChannel();

        String queue1Name = "test_direct_queue1";
        String queue2Name = "test_direct_queue2";

        //6.接收消息
        // String queue,队列名称
        // boolean autoAck,是否自动确认
        // Consumer callback,回调对象
        Consumer consumer = new DefaultConsumer(channel){
            //回调方法
            // String consumerTag,标识
            // Envelope envelope,获取一些信息交换机、路由key……
            // AMQP.BasicProperties properties,配置信息
            // byte[] body消息
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                System.out.println("counsumerTag:"+consumerTag);
//                System.out.println("Exchange:"+envelope.getExchange());
//                System.out.println("RoutingKey:"+envelope.getRoutingKey());
//                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body));
                System.out.println("将日志信息打印到控制台……");

            }
        };

        channel.basicConsume(queue1Name,true,consumer);

    }
}

消费者2

public class Consumer_Routing2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        //2.设置参数
        factory.setHost("ip"); //ip 默认localhost
        factory.setPort(5672); //端口 默认5672
        factory.setVirtualHost("/itcast");//虚拟机 默认/
        factory.setUsername("admin");//用户名
        factory.setPassword("admin");//密码

        //3.创建Connection
        Connection connection = factory.newConnection();

        //4.创建Channel
        Channel channel = connection.createChannel();

        String queue1Name = "test_direct_queue1";
        String queue2Name = "test_direct_queue2";

        //6.接收消息
        // String queue,队列名称
        // boolean autoAck,是否自动确认
        // Consumer callback,回调对象
        Consumer consumer = new DefaultConsumer(channel){
            //回调方法
            // String consumerTag,标识
            // Envelope envelope,获取一些信息交换机、路由key……
            // AMQP.BasicProperties properties,配置信息
            // byte[] body消息
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                System.out.println("counsumerTag:"+consumerTag);
//                System.out.println("Exchange:"+envelope.getExchange());
//                System.out.println("RoutingKey:"+envelope.getRoutingKey());
//                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body));
                System.out.println("将日志信息存储到数据库……");

            }
        };

        channel.basicConsume(queue2Name,true,consumer);

    }
}

小结

Routing 模式要求队列在绑定交换机时要指定 routing key消息会转发到符合 routing key 的队列。

2.2.4Topics 通配符模式

Topic 类型与 Direct 相比都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型 Exchange 可以让队列在绑定 Routing key 的时候使用通配符

Routingkey 一般都是有一个或多个单词组成多个单词之间以”.”分割例如 item.insert

通配符规则# 匹配一个或多个词* 匹配不多不少恰好1个词例如item.# 能够匹配 item.insert.abc 或者 item.insertitem.* 只能匹配 item.insert

 生产者

public class Producer_Topics {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        //2.设置参数
        factory.setHost("ip"); //ip 默认localhost
        factory.setPort(5672); //端口 默认5672
        factory.setVirtualHost("/itcast");//虚拟机 默认/
        factory.setUsername("admin");//用户名
        factory.setPassword("admin");//密码

        //3.创建Connection
        Connection connection = factory.newConnection();

        //4.创建Channel
        Channel channel = connection.createChannel();

        //5.创建交换机
        // String exchange,交换机名称
        // BuiltinExchangeType type,交换机的类型,4种类型
        //   DIRECT("direct"),定向
        //   FANOUT("fanout"),扇形广播  发送消息到每一个与之绑定的队列
        //   TOPIC("topic"),通配符的方式
        //   HEADERS("headers"),不知道

        // boolean durable,是否持久化
        // boolean autoDelete,是否自动删除
        // boolean internal,内部使用,一般是false
        // Map<String, Object> arguments,参数

        String exchangeName = "test_topic";

        channel.exchangeDeclare(exchangeName,BuiltinExchangeType.TOPIC,true,false,false,null);

        //6.创建队列
        String queue1Name = "test_topic_queue1";
        String queue2Name = "test_topic_queue2";
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);

        //7.绑定交换机和队列
        // String queue,队列名称
        // String exchange,交换机名称
        // String routingKey,路由名称,如果交换机类型为fanout,routingkey设置为空("")

        //需求 所有error级别日志存入数据库所有order系统的日志存入数据库
        channel.queueBind(queue1Name,exchangeName,"#.error");
        channel.queueBind(queue1Name,exchangeName,"order.*");

        channel.queueBind(queue2Name,exchangeName,"*.*");


        //8.发送消息
        String body = "日志信息张三去调用了findAll方法……日志级别info...";
        channel.basicPublish(exchangeName,"order.info",null,body.getBytes());

        //9.释放资源
        channel.close();
        connection.close();


    }
}

消费者1

public class Consumer_Topic1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        //2.设置参数
        factory.setHost("ip"); //ip 默认localhost
        factory.setPort(5672); //端口 默认5672
        factory.setVirtualHost("/itcast");//虚拟机 默认/
        factory.setUsername("admin");//用户名
        factory.setPassword("admin");//密码

        //3.创建Connection
        Connection connection = factory.newConnection();

        //4.创建Channel
        Channel channel = connection.createChannel();

        String queue1Name = "test_topic_queue1";
        String queue2Name = "test_topic_queue2";

        //6.接收消息
        // String queue,队列名称
        // boolean autoAck,是否自动确认
        // Consumer callback,回调对象
        Consumer consumer = new DefaultConsumer(channel){
            //回调方法
            // String consumerTag,标识
            // Envelope envelope,获取一些信息交换机、路由key……
            // AMQP.BasicProperties properties,配置信息
            // byte[] body消息
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                System.out.println("counsumerTag:"+consumerTag);
//                System.out.println("Exchange:"+envelope.getExchange());
//                System.out.println("RoutingKey:"+envelope.getRoutingKey());
//                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body));
                System.out.println("把信息存数据库……");

            }
        };

        channel.basicConsume(queue1Name,true,consumer);

    }
}

消费者2

public class Consumer_Topic2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        //2.设置参数
        factory.setHost("ip"); //ip 默认localhost
        factory.setPort(5672); //端口 默认5672
        factory.setVirtualHost("/itcast");//虚拟机 默认/
        factory.setUsername("admin");//用户名
        factory.setPassword("admin");//密码

        //3.创建Connection
        Connection connection = factory.newConnection();

        //4.创建Channel
        Channel channel = connection.createChannel();

        String queue1Name = "test_topic_queue1";
        String queue2Name = "test_topic_queue2";

        //6.接收消息
        // String queue,队列名称
        // boolean autoAck,是否自动确认
        // Consumer callback,回调对象
        Consumer consumer = new DefaultConsumer(channel){
            //回调方法
            // String consumerTag,标识
            // Envelope envelope,获取一些信息交换机、路由key……
            // AMQP.BasicProperties properties,配置信息
            // byte[] body消息
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                System.out.println("counsumerTag:"+consumerTag);
//                System.out.println("Exchange:"+envelope.getExchange());
//                System.out.println("RoutingKey:"+envelope.getRoutingKey());
//                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body));
                System.out.println("把日志信息打印到控制台……");

            }
        };

        channel.basicConsume(queue2Name,true,consumer);

    }
}

三.整合SpringBoot

3.1生产端

1. 创建生产者SpringBoot工程

2. 引入依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.itheima</groupId>
    <artifactId>producer-springboot</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <!--父工程-->
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.4.RELEASE</version>
    </parent>

    <dependencies>
        <!--rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>

</project>

3. 编写yml配置基本信息配置

#配置RabbitMQ的基本信息
spring:
  rabbitmq:
    host: ip
    username: admin
    password: admin
    port: 5672
    virtual-host: /

4. 定义交换机队列以及绑定关系的配置类

@Configuration
public class RabbitMQConfig {
    public static final String EXCHANGE_NAME = "boot_topic_exchange";
    public static final String QUEUE_NAME = "boot_queue";

    //1.交换机
    @Bean("bootExchange")
    public Exchange bootExchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }

    //2.队列
    @Bean("bootQueue")
    public Queue bootQueue(){
        return QueueBuilder.durable(QUEUE_NAME).build();
    }

    //3.绑定关系
    @Bean
    public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue,@Qualifier("bootExchange") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
    }

}

5. 注入RabbitTemplate调用方法完成消息发送

@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest {

    //1.注入
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSend(){
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.haha","boot mq hello!");
    }
}

3.2消费端

1. 创建消费者SpringBoot工程

2. 引入依赖

与生产端一样

3. 编写yml配置基本信息配置

与生产端一样

4. 定义监听类使用@RabbitListener注解完成队列监听。

@Component
public class RabbitMQListener {

    @RabbitListener(queues = "boot_queue")
    public void ListenerQueue(Message message){
        System.out.println(message);
    }
}

3.3总结

SpringBoot提供了快速整合RabbitMQ的方式

基本信息在yml中配置队列交互机以及绑定关系在配置类中使用Bean的方式配置

生产端直接注入RabbitTemplate完成消息发送

消费端直接使用@RabbitListener完成消息接收

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: RabbitMQ