15.消息队列RabbitMQ
阿里云国内75折 回扣 微信号:monov8 |
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6 |
一、基本概念
RabbitMQ 是一个开源的AMQP实现服务器端用Erlang语言编写支持多种客户端如Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等支持AJAX。用于在分布式系统中存储转发消息在易用性、扩展性、高可用性等方面表现不俗。
安装 RabbitMQ 需要先安装 Erlang 环境并配置环境变量安装完后进入 RabbitMQ 的 sbin 目录运行命令激活控制台界面访问地址 账号密码均为 guest。
rabbitmq-plugins enable rabbitmq_management
二、用户
- 超级管理员(administrator)可登陆管理控制台可查看所有的信息并且可以对用户策略(policy)进行操作。
- 监控者(monitoring)可登陆管理控制台同时可以查看rabbitmq节点的相关信息(进程数内存使用情况磁盘使用情况等)
- 策略制定者(policymaker)可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
- 普通管理者(management)仅可登陆管理控制台无法看到节点信息也无法对策略进行管理。
- 其他无法登陆管理控制台通常就是普通的生产者和消费者。
三、工作模式
RabbitMQ主要有五种工作模式分别是
- 简单模式hello world
- 工作队列模式work queue
- 发布/订阅模式publish/subscribe
- 路由模式routing
- 主题模式topic
导入依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.4.1</version>
</dependency>
工具类
public class ConnectionUtil {
public static Connection getConnection() throws Exception {
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost("localhost");
//端口
factory.setPort(5672);
//设置账号信息用户名、密码、vhost
factory.setVirtualHost("vhost");
factory.setUsername("guest");
factory.setPassword("guest");
// 通过工厂获取连接
Connection connection = factory.newConnection();
return connection;
}
}
1.简单模式hello world
//发送信息
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
// 从连接中创建通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("hello", false, false, false, null);
// 消息内容
String message = "Hello World!";
channel.basicPublish("", "hello", null, message.getBytes());
//关闭通道和连接
channel.close();
connection.close();
}
//接收消息
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
// 从连接中创建通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("hello", false, false, false, null);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列
channel.basicConsume("hello", true, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
}
}
2.工作队列模式work queue多个消费者消费同一队列消息。
//接收消息
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("hello", false, false, false, null);
// 同一时刻服务器只会发一条消息给消费者,否则MQ会将所有请求平均发送给所有消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列false表示手动返回完成状态true表示接收到消息马上自动确认完成
channel.basicConsume("hello", false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
// 返回确认状态否则表示使用自动确认模式
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
3.发布/订阅模式publish/subscribe通过交换机发送消息到多个队列。
//发送消息
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明exchange
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 消息内容
String message = "Hello World!";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
channel.close();
connection.close();
}
//接收消息
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
// 返回完成状态
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
4.路由模式routing通过交换机进行路由匹配发送消息到不同队列。
//发送消息
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明exchange及类型
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
// 消息内容
String message = "Hello World!";
//指定消息路由
channel.basicPublish(EXCHANGE_NAME, "routing", null, message.getBytes());
channel.close();
connection.close();
}
//接收消息
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机并指定多个路由
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routing1");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routing2");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
// 返回完成状态
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
5.主题模式topic通过交换机进行通配符匹配发送消息到不同队列。
//发送消息
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明exchange及类型
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 消息内容
String message = "Hello World!";
//指定消息匹配关键字
channel.basicPublish(EXCHANGE_NAME, "topic", null, message.getBytes());
channel.close();
connection.close();
}
//接收消息
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机并指定多个通配符
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "topic1.*");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "topic2.*");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
// 返回完成状态
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
四、Spring整合
Spring 提供了 RabbitTemplate 类执行消息发送。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
rabbitmq:
host: 192.168.88.88
port: 5672
username: guest
password: guest
virtual-host: /
@Configuration
public class MQConfig {
@Bean
public Exchange exchange1(){
return ExchangeBuilder.fanoutExchange("fanout").build();
}
@Bean
public Exchange exchange2(){
return ExchangeBuilder.directExchange("direct").build();
}
@Bean
public Queue queue1(){
return QueueBuilder.durable("hello1").build();
}
@Bean
public Queue queue2(){
return QueueBuilder.durable("hello2").build();
}
@Bean
public Binding binding1(Exchange exchange1,Queue queue1){
return BindingBuilder.bind(queue1).to(exchange1).with("key1").noargs();
}
@Bean
public Binding binding2(Exchange exchange2,Queue queue2){
return BindingBuilder.bind(queue2).to(exchange2).with("key2").noargs();
}
}
@Component
//定义队列并绑定
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "hello", durable = "true", autoDelete = "true"),
exchange = @Exchange(value = "fanout", type = ExchangeTypes.FANOUT), key = "key"), ackMode = "MANUAL")
public class MyListener {
@RabbitHandler
public void consume(Message message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel)
throws IOException {
//手动返回状态
if () {
// RabbitMQ的ack机制中第二个参数返回true表示需要将这条消息投递给其他的消费者重新消费
channel.basicAck(deliveryTag, false);
} else {
// 第三个参数true表示这个消息会重新进入队列
channel.basicNack(deliveryTag, false, true);
}
}
}