RabbitMQ发布与订阅模式类型
阿里云国内75折 回扣 微信号:monov8 |
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6 |
🍁博客主页👉不会压弯的小飞侠
✨欢迎关注👉点赞👍收藏⭐留言✒
✨系列专栏👉Linux专栏
🔥欢迎大佬指正一起学习一起加油
目录
🍁模式说明
-
工作队列背后的假设是每个任务都是 只交付给一名工人。在这一部分中我们将做一些事情 完全不同的 - 我们将向多个传递消息 消费者。此模式称为“发布/订阅”。
-
需要设置类型为fanout的交换机并且交换机和队列进行绑定当发送消息到交换机后交换机会将消息发送到绑定的队列
🍁发布与订阅模式完成消息传递
- 编写生产者发送消息
- 编写消息生产者 Producter
public class Producer {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
/*
exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
参数
1. exchange交换机名称
2. type交换机类型
DIRECT("direct"),定向
FANOUT("fanout"),扇形广播,发送消息到每一个与之绑定队列。
TOPIC("topic"),通配符的方式
HEADERS("headers");参数匹配
3. durable是否持久化
4. autoDelete自动删除
5. internal内部使用。 一般false
6. arguments参数
*/
String exchangeName = "test_fanout";
//5. 创建交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
//6. 创建队列
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
//7. 绑定队列和交换机
/*
queueBind(String queue, String exchange, String routingKey)
参数
1. queue队列名称
2. exchange交换机名称
3. routingKey路由键,绑定规则
如果交换机的类型为fanout ,routingKey设置为""
*/
channel.queueBind(queue1Name,exchangeName,"");
channel.queueBind(queue2Name,exchangeName,"");
String body = "此消息被交换机发到两个队列中";
//8. 发送消息
channel.basicPublish(exchangeName,"",null,body.getBytes());
//9. 释放资源
channel.close();
connection.close();
}
}
- 测试发布者
- 编写消费者接收消息
- 编写消息消费者Consumer1
public class Consumer1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String queue1Name = "test_fanout_queue1";
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body"+new String(body));
}
};
channel.basicConsume(queue1Name,true,consumer);
}
}
- 编写消费者接收消息
- 编写消息消费者Consumer2
public class Consumer2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String queue2Name = "test_fanout_queue2";
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body"+new String(body));
}
};
channel.basicConsume(queue2Name,true,consumer);
}
}
- 测试
- 启动所有消费者然后使用生产者发送消息在每个消费者对应的控制台可以查看到生产者发送的所有消息到达广播的效果。
🍁总结
- 交换机需要与队列进行绑定绑定之后一个消息可以被多个消费者都收到。
- 工作队列模式不用定义交换机而发布/订阅模式需要定义交换机。
- 发布/订阅模式的生产方是面向交换机发送消息工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)。
- 发布/订阅模式需要设置队列和交换机的绑定工作队列模式不需要设置实际上工作队列模式会将队列绑 定到默认的交换机