【消息中间件】1小时快速上手RabbitMQ
阿里云国内75折 回扣 微信号:monov8 |
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6 |
前 言
🍉 作者简介半旧518长跑型选手立志坚持写10年博客专注于java后端
☕专栏简介深入、全面、系统的介绍消息中间件
🌰 文章简介本文将介绍RabbitMQ一小时快速上手RabbitMQ
文章目录
1.MQ的介绍
1.1 MQ的基本概念
1.2 MQ的优缺点
下面是MQ的优缺点
1.2.1 优点1应用解耦
考虑由订单系统与库存系统、支付系统、物流系统直接通过远程调用方式通信模块耦合度较高的情况。
会存在问题
(1)容错性较低。
一个库存系统出问题通信链路断了订单下单就失败了。
(2)可维护性低
增加x系统需要修改订单系统的代码。再加再改撤回也改。
引入消息中间件MQ后。订单系统作为消息生产者生产订单消息生产消息到MQ库存、支付、物流作为消息消费者消费消息。即使库存系统出现问题也可以在恢复以后再从MQ中消费消息而不会导致订单失败。
扩展新的x系统只需要x系统把MQ中的消息再消费一次即可订单系统无需改代码。
1.2.2 优点2异步提速
引入中间件之前需要同步完成订单系统对其它模块的调用即调用库存返回后调用支付调用支付返回后调用物流。
引入中间件之后25ms返回请求。
1.2.3 优点3削峰填谷
引入消息中间件前假如A系统来个秒杀活动会寄。
引入之后MQ就像一个大仓库5000个请求对MQ小意思。A系统慢慢从仓库里运货吧。
1.2.4 缺点
1.3常见的消息中间件
2.RabbitMQ的介绍与安装
2.1 RabbitMQ简介
2.2 JMS概念
2.3 RabbitMQ安装
官网地址https://www.rabbitmq.com/
往下翻一翻下载软件和文档。
由于只是方便学习本文基于windows安装rabbitmq如果使用linux推荐使用docker进行部署。
2.3.1 安装依赖环境
由于rabbitmq是基于erlang语言开发的因此需要有erlang语言环境。
先看看官方文档中对于erlang的版本要求说明。
https://www.rabbitmq.com/which-erlang.html
可以看到不同版本的rabbitmq需要的erlang版本不同。这里我们使用3.11.5版的rabbitmq。因此需要的erlang版本是25.0-25.2
通过下列网址可以下载erlang25.2https://www.erlang.org/patches/otp-25.2
选择下图中的windows installer下载exe安装包。
点击exe文件安装即可。
配置环境变量。
cmd键入erl -version判断是否配置成功
C:\Users\24724>erl -version
Erlang (SMP,ASYNC_THREADS) (BEAM) emulator version 13.1.3
2.3.2 安装rabbitmq
下载exe安装包。
https://www.rabbitmq.com/install-windows.html
版本选用3.11.5
双击安装。从安装目录打开cmd
使用如下命令安装管理页面的插件
rabbitmq-plugins enable rabbitmq_management
双击bat脚本启动
这时打开浏览器输入http://localhost:15672账号密码默认是guest/guest
大功告成咯
2.4 rabbitmq管控台的使用
rabbitmq的管控台还是比较友好的可以随便点点。这里带大家简单熟悉下。
添加用户。
添加虚拟机。
点击添加的虚拟机ittest进入详情页面为其授权。
授权成功。
其它的您自己点点点吧很简单。
3.RabbitMQ快速入门
3.1 需求描述
RabbitMQ有六种工作模式先来介绍最简单的一种实现一个helloworld。
需求如下
图示如下。
3.2 准备工作
idea新建项目rabbitqmdemomaven模块rabbitmq-producerrabbitmq-consumer
在两个模块中引入依赖。
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.8.RELEASE</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
</dependencies>
3.3 生产者实现
producer下新建Producer_HelloWorld.java
。
其要实现的代码逻辑其实看着下图就会了。
代码如下。
/**
*
* 发送消息
*/
public class Producer_HelloWorld {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("localhost");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/ittest");//虚拟机 默认值/
factory.setUsername("guest");//用户名 默认 guest
factory.setPassword("guest");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
//5. 创建队列Queue
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
参数
1. queue队列名称
2. durable:是否持久化当mq重启之后还在
3. exclusive
* 是否独占。只能有一个消费者监听这队列
* 当Connection关闭时是否删除队列
*
4. autoDelete:是否自动删除。当没有Consumer时自动删除掉
5. arguments参数。
*/
//如果没有一个名字叫hello_world的队列则会创建该队列如果有则不会创建
channel.queueDeclare("hello_world",true,false,false,null);
/*
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
参数
1. exchange交换机名称。简单模式下交换机会使用默认的 ""
2. routingKey路由名称
3. props配置信息
4. body发送消息数据
*/
String body = "hello rabbitmq~~~";
//6. 发送消息
channel.basicPublish("","hello_world",null,body.getBytes());
//7.释放资源
channel.close();
connection.close();
}
}
代码运行前guest用户的管控台是这样的没有queue
管控台变成了这个样子
3.4 消费者实现
consumer模块新建Consumer_HelloWorld.java
。
public class Consumer_HelloWorld {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("localhost");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/ittest");//虚拟机 默认值/
factory.setUsername("guest");//用户名 默认 guest
factory.setPassword("guest");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
//5. 创建队列Queue
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
参数
1. queue队列名称
2. durable:是否持久化当mq重启之后还在
3. exclusive
* 是否独占。只能有一个消费者监听这队列
* 当Connection关闭时是否删除队列
*
4. autoDelete:是否自动删除。当没有Consumer时自动删除掉
5. arguments参数。
*/
//如果没有一个名字叫hello_world的队列则会创建该队列如果有则不会创建
channel.queueDeclare("hello_world",true,false,false,null);
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数
1. queue队列名称
2. autoAck是否自动确认
3. callback回调对象
*/
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
/*
回调方法当收到消息后会自动执行该方法
1. consumerTag标识
2. envelope获取一些信息交换机路由key...
3. properties:配置信息
4. body数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag"+consumerTag);
System.out.println("Exchange"+envelope.getExchange());
System.out.println("RoutingKey"+envelope.getRoutingKey());
System.out.println("properties"+properties);
System.out.println("body"+new String(body));
}
};
channel.basicConsume("hello_world",true,consumer);
//关闭资源不要,因为要一直监听消息
}
}
执行结果如下消费者成功的拿到了消息。