RabbitMQ官方案例学习记录-CSDN博客
阿里云国内75折 回扣 微信号:monov8 |
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6 |
官方文档RabbitMQ教程 — RabbitMQ (rabbitmq.com)
一、安装RabbitMQ服务
直接使用docker在服务器上安装
docker run -it -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.12-management
安装完成后访问15672端口默认用户名和密码都是 guest即可进入
二、hello world——梦开始的地方
1. 介绍
RabbitMQ 是一个消息代理它接受并转发消息。 你可以把它想象成一个邮局当你把你想要邮寄的邮件放在邮箱里时 您可以确定邮递员最终会将邮件递送给您的收件人。 在这个类比中RabbitMQ是一个邮政信箱一个邮局和一个信件载体。RabbitMQ和邮局的主要区别在于它不处理纸张 相反它接受、存储和转发数据的二进制 blob - 消息。
2. 一些术语
生产者消息的发送方
队列queue本质上是一个大型的消息缓冲区
消费者消息的使用方
Channel 频道理解为操作消息队列的 client比如 jdbcClient、redisClient提供了和消息队列 server 建立通信的传输方法为了复用连接提高传输效率。程序通过 channel 操作 rabbitmq收发消息
3. 编写代码
用 Java 编写两个程序;一个发送单个消息的生产者一个接收的使用者并将消息打印出来。
1消息生产者
编码过程
先创建连接工厂然后通过工厂创建连接再通过连接创建channel。通过channel来绑定队列或者交换机再用channel来生产或者消费消息。
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
对于这行代码可以看到消息是根据QUEUE_NAME路由到对应的队列。
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
public class MQProducer {
//设置队列名
private final static String QUEUE_NAME = "hello-zy";
public static void main(String[] argv) throws Exception {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置连接的服务器IP和端口号
factory.setHost("123.249.112.12");
factory.setPort(5672);
//创建一个连接和通道这里使用 try-with-resources语句
// 因为Connection和Channel都实现了java.lang.AutoCloseable不需要在代码中再显式的关闭他们
try(Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
/*
创建队列 channel.queueDeclare()用于声明一个队列
queue队列名称指定要声明的队列的名称。
durable持久化指定队列是否是持久化的。当 RabbitMQ 重新启动时持久化的队列将被保留下来。如果将该参数设置为 true则队列将被持久化如果设置为 false则队列不会被持久化。注意这里指的是队列本身的持久化而不是队列中的消息。
exclusive排他性指定队列是否是排他的。如果将该参数设置为 true则该队列只能被当前连接的消费者使用并且在连接关闭时会自动删除该队列。如果设置为 false则队列可供多个消费者使用。
autoDelete自动删除指定队列在不再被使用时是否自动删除。如果将该参数设置为 true则当队列不再被消费者使用时将自动删除该队列。如果设置为 false则队列不会自动删除。
arguments参数指定队列的其他属性和参数以键值对的形式提供。
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//发送消息
String msg = "hello,world! RabbitMQ!";
//channel.basicPublish("", QUEUE_NAME, null, msg.getBytes())
// 这行代码的作用是将 msg 消息发布到默认交换器空字符串并使用QUEUE_NAME作为路由键
// 消息的属性设置为默认值消息的内容为 msg.getBytes()即将 msg 转换为字节数组后发送。
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
System.out.println(" 生产者发送消息'" + msg + "'");
}
}
}
然后运行这个生产者代码去网页端查看
2消息消费者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class MQConsumer {
//声明队列 和消息发送方保持一致
private final static String QUEUE_NAME = "hello-zy";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
//设置rabbitMQ服务端ip
factory.setHost("123.249.112.12");
//这里不用try-with-resources 因为消费方需要一致保持监听不要关闭
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" 等待接收消息退出请按 CTRL+C");
//创建了一个消费者对象并实现了 handleDelivery() 方法作为回调方法。当消费者收到消息时将自动执行该方法。
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" 消费了消息'" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
去网页端查看消息是否被消费可以看到多了一个消费者并且消息已经没了。
三、工作队列 WorkQueue
工作队列又名任务队列背后的主要思想是避免立即执行资源密集型任务必须等待它要完成。相反我们将任务安排在以后完成。我们将任务封装为消息并将其发送到队列。正在运行的工作进程 在后台将弹出任务并最终执行工作。当您运行许多工作线程时任务将在它们之间共享。
WorkQueue的模型跟前面第一个案例HelloWorld的模型最明显的区别其实就是第一个案例他只有一个消费者。我们知道RabbitMQ他的消息是阅完即焚即消费者一旦接收这个消息直接就从Queue中被弹出了。
而现在这个案例他有两个消费者画两个只是方便他当然也可以有3个、4个他的消息应该是通过某种算法做负载均衡送到不同的消费者让消费者进行处理让消息不至于处理不过来从而导致滞留在Queue中的消息被弹出。
思路如下
1、我们先让Publish服务每秒发布50条消息到 simple.queue来演示消息的频繁发送。
2、在Consumer服务中定义两个消费者来监听我们的 simple.queue队列。
3、消费者1每秒处理40条消息消费者2每秒处理30条消息。
1. 循环调度
1生产者
生成50条消息
public class Send {
//设置队列名
private final static String QUEUE_NAME = "hello-zy";
public static void main(String[] argv) throws Exception {
//创建连接
ConnectionFactory factory = new ConnectionFactory();
//设置连接的服务器IP和端口号
factory.setHost("123.249.112.12");
factory.setPort(5672);
//创建一个通道这里使用 try-with-resources语句
// 因为Connection和Channel都实现了java.lang.AutoCloseable不需要在代码中再显式的关闭他们
try(Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//发送消息
for(int i = 1; i <= 50; i++) {
String msg = "hello, I am";
msg = msg + i;
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
Thread.sleep(200);
System.out.println(" 生产者发送消息'" + msg + "'");
}
}
}
}
2消费者1和消费者2
public class Worker_01 {
//声明队列 和消息发送方保持一致
private final static String QUEUE_NAME = "hello-zy";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
//设置rabbitMQ服务端ip
factory.setHost("123.249.112.12");
//这里不用try-with-resources 因为消费方需要一致保持监听不要关闭
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" 等待接收消息退出请按 CTRL+C");
//创建了一个消费者对象并实现了 handleDelivery() 方法作为回调方法。当消费者收到消息时将自动执行该方法。
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" Worker_01消费了消息'" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
public class Worker_02 {
//声明队列 和消息发送方保持一致
private final static String QUEUE_NAME = "hello-zy";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
//设置rabbitMQ服务端ip
factory.setHost("123.249.112.12");
//这里不用try-with-resources 因为消费方需要一致保持监听不要关闭
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" 等待接收消息退出请按 CTRL+C");
//创建了一个消费者对象并实现了 handleDelivery() 方法作为回调方法。当消费者收到消息时将自动执行该方法。
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" Worker_02消费了消息'" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
启动两个消费者然后开始生产消息控制台打印如下
可以看到是交替消费的
2. 消息确认
在上面的代码中一旦消息传递给消费者就会立即被删除。如果在消费过程中消费者宕机消息没消费成功但是因为已经投递出去了消息从队列删掉。就会出现消息未消费且丢失的问题。对于这种情况我们希望没消费成功的消息转交给其他的消费者消费。
为了确保消息永远不会丢失RabbitMQ 支持消息确认。确认由消费者告诉 RabbitMQ 已收到特定消息并处理RabbitMQ可以自由删除它。
如果消费者没有发送确认消息rabbitMQ可以知道消息没有消费成功并将重新排队。
在消费者返回确认消息时强制实施超时默认为 30 分钟。 这有助于检测从不确认交付的错误卡住消费者。 可以按照传递确认超时中所述增加此超时。
默认情况下手动消息确认处于打开状态。在上一个 示例我们通过 autoAck=true 标志明确关闭了它们。是时候将此标志设置为 false 来使消费者发送确认消息。
1消费者
生产者还是不变生成10条消息到队列中
创建消费者启动手动确认
public class Consumer {
private final static String QUEUE_NAME = "hello-zy";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("123.249.112.12");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 将autoAck参数设置为false关闭自动消息确认
/*
* new DefaultConsumer(channel) { ... }这是一个匿名内部类用于定义消息处理的逻辑。
* 它继承自DefaultConsumer并覆盖了 handleDelivery 方法以自定义消息的处理方式。
* */
channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
/*handleDelivery 方法这是 DefaultConsumer 类中的方法用于处理从队列中接收的消息。
consumerTag标识消费者的标签。
envelope包含与消息相关的元数据如交付标签、交付模式等。
properties包含消息的属性如消息的头部信息。
body消息的内容以字节数组形式提供。
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("消费者接收消息: '" + message + "'");
// 在消息处理成功后发送确认消息
channel.basicAck(envelope.getDeliveryTag(), false);
System.out.println("确认收到了消息"+message);
}
});
}
}
启动消费者
2验证未成功消费情况
先生产6条消息到队列
简单修改一下消费者代码在中间添加判断逻辑当碰到消息5的时候退出消费者
public class Consumer1 {
private final static String QUEUE_NAME = "hello-zy";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("123.249.112.12");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
try {
// 将autoAck参数设置为false关闭自动消息确认
channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("消费者接收消息: '" + message + "'");// 模拟某个条件例如消息处理成功
if (!message.contains("5")) {
// 在消息处理成功后发送确认消息
channel.basicAck(envelope.getDeliveryTag(), false);
System.out.println("确认收到了消息" + message);
}// 模拟消费者退出
if (message.contains("5")) {
throw new RuntimeException("Consumer exiting...");
}
}
});
} catch (RuntimeException e) {
// 当消费者退出时捕获异常
System.out.println("Consumer exited.");
}
}
}
可以看到当消费者消费到第五条消息后因为抛出了异常所以后面的消息都未消费成功所以会把第五条消息和第六条消息再放回消息队列查看消息队列可以看到还有两条消息在队列中
如果把参数改成true也就是自动确认这就会导致未消费的消息丢失
3. 消息持久性
我们已经保证了消息未消费的情况下不会丢失但是如果RabbitMQ服务器宕机消息还是会丢失。这就涉及到一个消息持久化的问题。
需要做两件事来确保 消息不会丢失我们需要将队列和消息都标记为durable。 重新定义一个队列
此时我们确信durable_queue队列不会丢失 即使 RabbitMQ 重新启动。现在我们需要将我们的消息标记为持久 - 通过设置消息属性实现基本属性 到值PERSISTENT_TEXT_PLAIN。
如果不设置未持久性重启docker的rabbitmq容器队列消息就丢了设置durable=true后 即使重启docker容器队列和消息都不会丢失。
四、发布/订阅模式
在前面都是一条消息由一个消费者消费。如果一条消息需要被多个消费者消费那么就需要引入发布/订阅模式。
1. 交换机
关于交换机的概念
一个生产者给 多个 队列发消息1 个生产者对多个队列。
交换机的作用提供消息转发功能类似于网络路由器
要解决的问题怎么把消息转发到不同的队列上好让消费者从不同的队列消费。
交换机有多种类别fanout、direct, topic, headers
fanout扇出
扇出、广播
特点消息会被转发到所有绑定到该交换机的队列
场景很适用于发布订阅的场景。比如写日志可以多个系统间共享
Direct 直接
绑定可以让交换机和队列进行关联可以指定让交互机把什么样的消息发送给哪个队列类似于计算机网络中两个路由器或者网络设备相互连接也可以理解为网线
routingKey路由键控制消息要转发给哪个队列的IP 地址
特点消息会根据路由键转发到指定的队列
场景特定的消息只交给特定的系统程序来处理
绑定关系完全匹配字符串
比如发日志的场景希望用独立的程序来处理不同级别的日志比如 C1 系统处理 error 日志C2 系统处理其他级别的日志
topic 交换机
特点消息会根据一个 模糊的 路由键转发到指定的队列
场景特定的一类消息可以交给特定的一类系统程序来处理
绑定关系可以模糊匹配多个绑定
●*匹配一个单词比如 *.orange那么 a.orange、b.orange 都能匹配
●#匹配 0 个或多个单词比如 a.#那么 a.a、a.b、a.a.a 都能匹配
注意这里的匹配和 MySQL 的like 的 % 不一样只能按照单词来匹配每个 '.' 分隔单词如果是 '#.'其实可以忽略匹配 0 个词也 ok
Headers 交换机
类似主题和直接交换机可以根据 headers 中的内容来指定发送到哪个队列。使用消息头headers来路由消息。
2. 使用fanout交换机来实现发布/订阅
1生产者
public class LogProducer {
private final static String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("123.249.112.12");
factory.setPort(5672);
try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
//声明交换机类型为FANOUT
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
for (int i = 1; i <= 5; i++) {
String message = "Log message " + i;
//消息发到交换机中而不是像之前点对点那样直接发到消息队列中
channel.basicPublish(EXCHANGE_NAME, "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println("生产者发送日志: " + message);
}
}
}
}
查看交换机列表可以看到新增了一个名为logs的交换机类型为 fanout
2消费者
创建两个消费者都绑定一个交换机名字为log
public class LogConsumer {
private final static String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("123.249.112.12");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
// 创建一个临时队列
String queueName = channel.queueDeclare().getQueue();
// 将队列绑定到交换器
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println("等待接收日志消息...");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("消费者1接收消息: '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
public class LogConsumer2 {
private final static String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("123.249.112.12");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
// 创建一个临时队列
String queueName = channel.queueDeclare().getQueue();
// 将队列绑定到交换器
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println("等待接收日志消息...");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("消费者2接收消息: '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
启动两个消费者然后启动生产者发送消息可以看到两个消费者都消费了消息
五、路由
在上面的例子中我们实现了一个简单的日志记录系统。能够向多个消费者广播消息。
下面要增加一个新的功能
例如我们将只能将关键错误消息定向到 日志文件以节省磁盘空间同时仍然能够打印所有控制台上的日志消息。
1. 绑定队列和交换机
之前是这样绑定的第三个参数就是路由键
channel.queueBind(queueName, EXCHANGE_NAME, "");
绑定是交换和队列之间的关系。
绑定可以采用额外的路由键参数。为了避免 与basic_publish参数混淆我们将它称为绑定键。绑定键的含义取决于交换类型。
channel.queueBind(queueName, EXCHANGE_NAME, "black");
2. 使用direct交换机绑定
1生产者
模拟三条不同的消息指定消息1的路由键为orange消息2的路由键为black消息3的路由键为green。
public class LogProducer {
private final static String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("123.249.112.12");
factory.setPort(5672);
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
//声明交换机类型为 direct
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String message1 = "Message with routing key orange";
String message2 = "Message with routing key black";
String message3 = "Message with routing key green";
// 发布消息到交换器并指定不同的路由键
channel.basicPublish(EXCHANGE_NAME, "orange", null, message1.getBytes());
channel.basicPublish(EXCHANGE_NAME, "black", null, message2.getBytes());
channel.basicPublish(EXCHANGE_NAME, "green", null, message3.getBytes());
System.out.println("生产者发送消息完成.");
}
}
}
2消费者
创建两个消费者
消费者1和消费者2都绑定对应的交换机其中消费者1对应路由键orange消费者2对应路由键black和orange。
public class LogConsumer1 {
private final static String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("123.249.112.12");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 创建一个临时队列
String queueName = channel.queueDeclare().getQueue();
// 绑定队列到交换器指定路由键为 "orange"
channel.queueBind(queueName, EXCHANGE_NAME, "orange");
System.out.println("等待接收 orange 消息...");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("消费者1接收消息: '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
public class LogConsumer2 {
private final static String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("123.249.112.12");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 创建一个临时队列
String queueName = channel.queueDeclare().getQueue();
// 绑定队列到交换器指定路由键为 "black" 和 "green"
channel.queueBind(queueName, EXCHANGE_NAME, "black");
channel.queueBind(queueName, EXCHANGE_NAME, "green");
System.out.println("等待接收 black 和 green 消息...");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("消费者2接收消息: '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}