rabbitMQ(3)-CSDN博客

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

RabbitMq 交换机

文章目录


前言:

在之前的文章中我们一直都没有提到交换机只是在介绍 rabbitmq 的时候 说过一嘴交换机 , 另外在 之前文章的代码案例中我们其实

是使用到过交换机的

比如

channel.basicPublish("",QUEUE_NAME,null,message.getBytes());


这里第一个参数就是用来知名交换机的但是我们用 “” 来作为交换机的名rabbitmq 就会默认使用 Direct Exchange直连交换机 >

既然 之前我们没有讲到过 交换机下面我们就来学习一下交换机.

1. 交换机的介绍

RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息不会直接发送到队列。实际上通常生产者甚至都不知道这些消息传递传递到了哪些队列中。

相反生产者只能将消息发送到交换机(exchange)交换机工作的内容非常简单一方面它接收来自生产者的消息另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。

在这里插入图片描述


简单一句话 交换机是用来将 生产者生产的消息 转发到对应的队列中.


简单看完交换机的介绍下面我们来看看 rabbitmq 中交换机的几种类型 .

2. 交换机的类型


引用

  1. Direct Exchange直连交换机根据消息的 routing key 将消息路由到与之完全匹配的队列。适用于一对一的消息传递。 比如 一个队列绑定到该交换机上 要求 routing key (路由键) 为 abc 那么只有被标记为 abc 的消息才能被转发不会转发 abc.def , 也不会转发 aaa.bbb.ccc 只会转发 abc.

  1. Fanout Exchange扇形交换机将消息广播到所有与该交换机绑定的队列。适用于一对多的消息广播。扇形交换机不处理路由键我们只需要简单的将队列绑定到交换机上一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播每台子网内的主机都获得了一份复制的消息。Fanout 交换机转发消息是最快的。

  1. Topic Exchange主题交换机根据消息的 routing key 和交换机与队列绑定时的 routing pattern 进行匹配将消息路由到满足条件的队列。支持通配符匹配比如 符号“#” 匹配一个或多个词符号 * 匹配不多不少一个词。因此 abc.# 能够匹配到 abc.def.ghi但是 abc.* 只会匹配到 abc.def。

  1. Headers Exchange头交换机根据消息的头部属性进行匹配并将消息路由到满足条件的队列。

    解释: 不处理路由键。而是根据发送的消息内容中的headers属性进行匹配。在绑定 Queue 与 Exchange 时指定一组键值对当消息发送
    到RabbitMQ 时会 取到该消息的 headers 与 Exchange 绑定时指定的键值对进行匹配如果完全匹配则消息会路由到该队列否则不会路由到该队列。headers 属性是一个键值对可以是 Hashtable键值对的值可以是任何类型。而 fanoutdirecttopic 的路由键都需要要字符串形式的

    匹配规则 x-match 有下列两种类型

    x-match = all 表示所有的键值对都匹配才能接受到消息

    x-match = any 表示只要有键值对匹配就能接受到消息
  2. Default Exchange默认交换机它是一个特殊的直连交换机类型为 direct当没有指定交换机时默认的交换机会将消息根据消息的 routing key 发送到同名的队列上


看完了 交换机的类型这里先来 学习一下 临时队列 之后在演示绑定交换机 会使用到.

3. 临时队列

在 之前的文章中 我们每次创建的队列都是具有特定的名字 比如 hello , ack_queue 可以说 队列的名字对我们来说是非常重要的 因为我们要指定 消费者 去那个队列消费消息 . 但是我们 每次都要去想名字有时候我们愿 想名字 取名字啥的最烦了 .为了解决这个问题我们就可以创建一个具有随机名称的队列 或者让服务器为我们选择一个随机对立的名称. 另外 如果 我们 想要 在使用完队列 后 断开连接 就删除 一次性的队列 就可以 通过下面这种方式来创建 临时队列.

String queueName = channel.queueDeclare().getQueue();


创建出来的队列:

在这里插入图片描述


知道了如何 创建临时队列下面我们来看看 绑定 bindings 这里是 使用 交换机 最重要的 一环节 , 如果 没有绑定 交换机 就无法绑定到 队列中交换机就无法 把 消息 转发给 队列.

4. 绑定 (bindings)

什么是 绑定 呢绑定 其实是 exchange 和 queue 之间的桥梁它告诉我们 交换机 和 那个队列进行了绑定关系。

比如说下面这张图告诉我们的就是 X 与 Q1 和 Q2 进行了绑定

在这里插入图片描述

关于 临时队列 和 绑定 预备知识点看完后我们就来 使用 交换机.

5. 扇形交换机Fanout 演示

扇形交换机: 将消息广播到所有与该交换机绑定的队列。适用于一对多的消息广播。

演示: 这里我们创建一个简单的日志系统来完成代码样式 .

创建一个消费者 用来 生产消息 创建两个消费者 一个消费者 将接收到的消息 显示到 控制台另外一个将消息 存储到 本地磁盘 .


大致

在这里插入图片描述


代码案例:


消费者:

ReceiveLogs01

package org.example.five;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

// 消费者1 --> 将消息显示到控制台
public class ReceiveLogs01 {

    // 交换机名字
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();

        // 声明一个交换机 --> 扇形 fanout
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        // 声明一个队列 , 临时队列 --> 名字随机,当消费者与临死队列断开连接后 队列自动删除
        String queueName = channel.queueDeclare().getQueue();

        // 绑定交换机与队列 --> 第三个参数为路由键 因为使用的是扇形交换机 , 所以路由键可以不写 用 ""
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        // 接受消息 -->  消费者接受消息时的回调
        DeliverCallback deliverCallback = (tag, message) -> {
            System.out.println("控制台打印接收到的消息:" + new String(message.getBody(), "UTF-8"));
        };
        channel.basicConsume(queueName, true, deliverCallback, tag -> {
        });
    }
}


ReceiveLogs02

package org.example.five;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;

import java.io.*;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.TimeoutException;

// 消费者2 --> 将消息 写入本地
public class ReceiveLogs02 {

    // 交换机名字
    private static final String EXCHANGE_NAME = "logs";

    private static final String path = "E:\\java\\java_lx\\practice\\javaTest\\blog_rabbitmq\\src\\main\\java\\org\\example\\five";

    public static void main(String[] args) throws IOException, TimeoutException, URISyntaxException {
        Channel channel = RabbitMQUtils.getChannel();

        // 声明一个交换机 --> 扇形 fanout
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        // 声明一个队列 , 临时队列 --> 名字随机,当消费者与临死队列断开连接后 队列自动删除
        String queueName = channel.queueDeclare().getQueue();

        // 绑定交换机与队列 --> 第三个参数为路由键 因为使用的是扇形交换机 , 所以路由键可以不写 用 ""
        channel.queueBind(queueName, EXCHANGE_NAME, "");


        // 接受消息 -->  消费者接受消息时的回调
        DeliverCallback deliverCallback = (tag, message) -> {
            // 文件操作
            File file = new File(path + "/test.txt");

            // 文件不存在创建文件
            if (!file.exists()) {
                file.createNewFile();
            }

            // 使用 Files.newOutputStream 共创创建 outputStream 对象
            try (OutputStream outputStream = Files.newOutputStream(file.toPath());) {
                outputStream.write(message.getBody());
            } catch (IOException e) {
                throw new RuntimeException("写入文件时发生错误: " + e.getMessage());
            }

        };
        channel.basicConsume(queueName, true, deliverCallback, tag -> {
        });
    }
}


生产者: EmitLog

package org.example.five;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;

import java.io.*;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.TimeoutException;

// 消费者2 --> 将消息 写入本地
public class ReceiveLogs02 {

    // 交换机名字
    private static final String EXCHANGE_NAME = "logs";

    private static final String path = "E:\\java\\java_lx\\practice\\javaTest\\blog_rabbitmq\\src\\main\\java\\org\\example\\five";

    public static void main(String[] args) throws IOException, TimeoutException, URISyntaxException {
        Channel channel = RabbitMQUtils.getChannel();

        // 声明一个交换机 --> 扇形 fanout
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        // 声明一个队列 , 临时队列 --> 名字随机,当消费者与临死队列断开连接后 队列自动删除
        String queueName = channel.queueDeclare().getQueue();

        // 绑定交换机与队列 --> 第三个参数为路由键 因为使用的是扇形交换机 , 所以路由键可以不写 用 ""
        channel.queueBind(queueName, EXCHANGE_NAME, "");


        // 接受消息 -->  消费者接受消息时的回调
        DeliverCallback deliverCallback = (tag, message) -> {
            // 文件操作
            File file = new File(path + "/test.txt");

            // 文件不存在创建文件
            if (!file.exists()) {
                file.createNewFile();
            }

            // 使用 Files.newOutputStream 共创创建 outputStream 对象 , ture 表示 写文件不覆盖之前的内容
            try (OutputStream outputStream = new FileOutputStream(file, true)) {
                outputStream.write(message.getBody());
            } catch (IOException e) {
                throw new RuntimeException("写入文件时发生错误: " + e.getMessage());
            }

        };
        channel.basicConsume(queueName, true, deliverCallback, tag -> {
        });
    }
}


注意:

先启动两个消费者再启动生产者。

生产者生产消息后如果没有对应的消费者接收则该消息是遗弃的消息


启动看看效果:

在这里插入图片描述


可以看到 一个 生产者生产者的消息被多个消费者消费 到此 fanout 交换机 就 演示完成了 .

6. 直接交换机 Direct exchange

直接交换机: 根据消息的 routing key 将消息路由到与之完全匹配的队列。适用于一对一的消息传递。


在使用 Fanout 交换机 完成了对日志系统的构造 但是还不够因为 日志 有很多 我们并不好 一下找到 严重错误 我们可以让一个消费者去操作文件存入全部日志 让 另外一个消费者 消费严重错误 如果是严重的错误 才发送给消费者) 将 严重错误的消息存盘 . 此时就好找错误了.


我们要实现这个功能 扇形交换机就不好完成因为扇形交换机会将消息传播给全部绑定的队列中 这里我们就可以使用直接交换机 (direct)


使用 queueBind 方法 绑定路由键

channel.queueBind(queueName, EXCHANGE_NAME, "routingKey");


绑定玩路由键后 消息只会去 它绑定的 路由键队列中.

在这里插入图片描述


在上面这张图中我们可以看到 X 绑定了两个队列绑定类型是 direct。队列 Q1 绑定键为 orange 队列 Q2 绑定键有两个一个绑定键为 black另一个绑定键为 green.


在这种绑定情况下生产者发布消息到 exchange 上绑定键为 orange 的消息会被发布到队列 Q1。绑定键为 black green 和的消息会被发布到队列 Q2其他消息类型的消息将被丢弃。

6.1 多重绑定

在这里插入图片描述


当然如果 exchange 的绑定类型是direct但是它绑定的多个队列的 key 如果都相同在这种情况下虽然绑定类型是 direct 但是它表现的就和 fanout 有点类似了就跟广播差不多如上图所示。

6.2 direct 代码案例


这里我们 实现 让 生产者发送多个消息 多个消费者 消费不同的消息.

图:

在这里插入图片描述


C1 消费者绑定 console 队列routingKey 为 info、warning

C2 消费者绑定 disk 队列routingKey 为 error


当生产者生产消息到 direct_logs 交换机里该交换机会检测消息的 routingKey 条件然后分配到满足条件的队列里最后由消费者从队列消费消息。


代码 这里就不写文件操作了直接将消费者拿到的消息 放到 控制到上.


生产者

package org.example.five;

import com.rabbitmq.client.Channel;
import org.example.utils.RabbitMQUtils;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

public class DirectLogs {

    // 交换机
    public static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();
        
        Scanner sc = new Scanner(System.in);

        while (sc.hasNext()) {
            String message = sc.next();
            // 路由键为 info
            channel.basicPublish(EXCHANGE_NAME, "warning", null, message.getBytes("UTF-8"));

            System.out.println("生产者发出消息: " + message);
        }
    }
}


启动后 先用 路由键为 info 发送多个消息 然后更改 为 error 和 warning 发送消息.


消费者 c1

package org.example.five;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

// 消费者 c1 -- 消费 路由键为 info 和 warning 的消息
public class ReceiveLogsDirect01 {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = RabbitMQUtils.getChannel();

        // 声明一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        // 声明一个队列
        channel.queueDeclare("console", false, false, false, null);

        // 绑定 --> 队列 console , 交换机 direct_logs , 路由键 info
        channel.queueBind("console", EXCHANGE_NAME, "info");

        // 绑定 --> 队列 console , 交换机 direct_logs , 路由键 warning
        channel.queueBind("console", EXCHANGE_NAME, "warning");

        // 接受消息
        DeliverCallback deliverCallback = (tag, message) -> {
            System.out.println("ReceiveLogsDirect01控制台打印接收到的消息:" + new String(message.getBody(), "UTF-8"));
        };

        channel.basicConsume("console", true, deliverCallback, (tag) -> {
        });
    }
}


消费者 c2

package org.example.five;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

// 消费者 c2 -- 消费 路由键为 error
public class ReceiveLogsDirect02 {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = RabbitMQUtils.getChannel();

        // 声明一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);


        // 声明一个队列
        channel.queueDeclare("disk", false, false, false, null);

        // 绑定 --> 队列 console , 交换机 direct_logs , 路由键 info
        channel.queueBind("disk", EXCHANGE_NAME, "error");



        // 接受消息
        DeliverCallback deliverCallback = (tag, message) -> {
            System.out.println("ReceiveLogsDirect01控制台打印接收到的消息:" + new String(message.getBody(), "UTF-8"));
        };

        channel.basicConsume("disk", true, deliverCallback, (tag) -> {});
    }
}


效果:


图一:

在这里插入图片描述


图二:

在这里插入图片描述

7. 主题交换机


主题交换机: 根据消息的 routing key 和交换机与队列绑定时的 routing pattern 进行匹配将消息路由到满足条件的队列。支持通配符匹配

上面我们使用 直接交换机改进了 日志记录系统 。我们没有使用只能进行随意广播的 fanout 交换机而是使用了 direct 交换机从而有能实现有选择性地接收日志。


尽管使用 direct 交换机改进了我们的系统但是它仍然存在局限性——比方说我们想接收的日志类型有 info.base 和 info.advantage某个队列只想 info.base 的消息那这个时候direct 就办不到了。这个时候就只能使用 topic 类型


另外 使用 topic 需要注意:

发送到类型是 topic 交换机的消息的 routing_key 不能随意写必须满足一定的要求它必须是一个单词列表以点号分隔开。这些单词可以是任意单词

比如说“stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit” 这种类型的。

当然这个单词列表最多不能超过 255 个字节。

在这个规则列表中其中有两个替换符是大家需要注意的

  • *(星号)可以代替一个位置
  • #(井号)可以替代零个或多个位置

7.1 Topic 匹配案例


绑定关系图

在这里插入图片描述

  • Q1–>绑定的是
    • 中间带 orange 带 3 个单词的字符串 (*.orange.*)
  • Q2–>绑定的是
    • 最后一个单词是 rabbit 的 3 个单词 (*.*.rabbit)
    • 第一个单词是 lazy 的多个单词 (lazy.#)


对上面 q1 和 q2 绑定的路由键 举几个例子

例子说明
uick.orange.rabbit被队列 Q1Q2 接收到
azy.orange.elephant被队列 Q1Q2 接收到
quick.orange.fox被队列 Q1 接收到
lazy.brown.fox被队列 Q2 接收到
lazy.pink.rabbit虽然满足两个绑定但只被队列 Q2 接收一次
quick.brown.fox不匹配任何绑定不会被任何队列接收到会被丢弃
quick.orange.male.rabbit是四个单词不匹配任何绑定会被丢弃
lazy.orange.male.rabbit是四个单词但匹配 Q2


如果一个队列绑定的路由键为 # 此时队列可以接收到所有的数据此时就相当于绑定了 一个 fanout (扇形) 交换机了 如果 一个队列 绑定的路由键 没有 # 和 * 此时就可以认为 绑定了一个 direct(直接) 交换机了.


看完这些下面就来看看代码案例

7.2 Topic 代码案例

创建一个生产者 生产多个消息到交换机,交换机按照通配符分配消息到不同的队列中队列由消费者进行消费


生产者 EmitLogTopic:

package org.example.six;

import com.rabbitmq.client.Channel;
import org.example.utils.RabbitMQUtils;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class EmitLogTopic {

    // 交换机的名称
    public static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();


        /**
         * Q1-->绑定的是
         *      中间带 orange 带 3 个单词的字符串(*.orange.*)
         * Q2-->绑定的是
         *      最后一个单词是 rabbit 的 3 个单词(*.*.rabbit)
         *      第一个单词是 lazy 的多个单词(lazy.#)
         */

        HashMap<String, String> bindingKeyMap = new HashMap<>();

        bindingKeyMap.put("quick.orange.rabbit", "被队列 Q1Q2 接收到");
        bindingKeyMap.put("lazy.orange.elephant", "被队列 Q1Q2 接收到");
        bindingKeyMap.put("quick.orange.fox", "被队列 Q1 接收到");
        bindingKeyMap.put("lazy.brown.fox", "被队列 Q2 接收到");
        bindingKeyMap.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列 Q2 接收一次");
        bindingKeyMap.put("quick.brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃");
        bindingKeyMap.put("quick.orange.male.rabbit", "是四个单词不匹配任何绑定会被丢弃");
        bindingKeyMap.put("lazy.orange.male.rabbit", "是四个单词但匹配 Q2");

        // 遍历 map 发送消息
        for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {
            String routingKey = bindingKeyEntry.getKey();
            String message = bindingKeyEntry.getValue();

            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
            System.out.println("生产者发出消息: " + message);
        }
    }
}


消费者c1

package org.example.six;


// 消费者 c1 --> 绑定的路由键为 *.orange.*

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ReceiveLogsTopic01 {

    // 交换机名称
    public static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();

        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

        // 声明队列
        String queueName = "Q1";

        channel.queueDeclare(queueName, false, false, false, null);

        channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");
        System.out.println("等待接受消息.....");

        // 接收到消息的回调
        DeliverCallback deliverCallback = (tag, message) -> {
            System.out.println(new String(message.getBody(), "UTF-8"));
            System.out.println("接收队列" + queueName + "  绑定键:" + message.getEnvelope().getRoutingKey());
        };

        // 接受消息
        channel.basicConsume(queueName, true, deliverCallback, (message) -> {});
    }
}


消费者2

package org.example.six;


// 消费者 c2 绑定的路由键为 *.*.rabbit  , lazy.#

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ReceiveLogsTopic02 {

    // 交换机名称
    public static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();

        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

        // 声明队列
        String queueName = "Q2";

        channel.queueDeclare(queueName, false, false, false, null);

        channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
        channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");

        System.out.println("c2 等待接受消息.....");

        // 接收到消息的回调
        DeliverCallback deliverCallback = (tag, message) -> {
            System.out.println(new String(message.getBody(), "UTF-8"));
            System.out.println("接收队列" + queueName + "  绑定键:" + message.getEnvelope().getRoutingKey());
        };

        // 接受消息
        channel.basicConsume(queueName, true, deliverCallback, (message) -> {});
    }
}


效果展示:

在这里插入图片描述

8. headers 头交换机


headers 皮皮额 AMQP 消息的 headr 而不是 路由键 此外 headers 交换机和 direct 交换机完全一致 但是性能差很多 目前几乎 用不到 了解即可.


消费方指定的 headers 中必须包含一个 “x-match” 的键 。

键 “x-match” 的值有两个

  1. x-match = all 表示所有的键值对都匹配才能接收到消息
  2. x-mathc = any : 表示只有键值对匹配就能接受到消息

在这里插入图片描述


代码演示:

生产者:

package org.example.six;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import org.example.utils.RabbitMQUtils;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class ProducerHeaders {

    public static String EXCHANGE = "header_exchange";

    public static String QUEUE = "header_queue";

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = RabbitMQUtils.getChannel();

        // 声明一个交换机
        channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.HEADERS, true, false, null);

        // 声明一个队列
        channel.queueDeclare(QUEUE, true, false, false, null);


        Map<String, Object> headerMap = new HashMap<>();
        headerMap.put("name", "abcdef");
        headerMap.put("sex", "男");
        AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder().headers(headerMap);

        // 发送消息
        String message = "hello_header";


        channel.basicPublish(EXCHANGE, "", properties.build(), message.getBytes());
    }

}

消费者

package org.example.six;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeoutException;

public class ConsumerHeader {

    public static String EXCHANGE = "header_exchange";

    public static String QUEUE = "header_queue";

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = RabbitMQUtils.getChannel();

        DeliverCallback deliverCallback = (tag, message) -> {
            System.out.println("接收到消息: " + new String(message.getBody()));
        };

        CancelCallback callback = (tag) -> {
            System.out.println("消息被中断");
        };

        Map<String, Object> headerMap = new HashMap<>();
        headerMap.put("x-match", "all");
        // 除了 all 还有 any 
        headerMap.put("name", "abcdef");
        headerMap.put("sex", "男");

        channel.queueBind(QUEUE, EXCHANGE, "", headerMap);


        channel.basicConsume(QUEUE, true, deliverCallback, callback);
    }
}

效果:

图一:

在这里插入图片描述

图二:

在这里插入图片描述

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: RabbitMQ