RabbitMQ的基础学习(上)

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

前言

        RabbitMQ是一个基于AMQP规范实现的消息队列。它具有性能好、高可用、跨平台性、社区活跃等优点比较适合中小型公司使用。掌握RabbitMQ相关知识对工作和学习都有帮助。下面我讲详细介绍一下Rabbit的相关知识。

正文

一、AMQP规范

          首先我们先要说明一下AMQP规范这有利于我们学习RabbitMQ相关知识。

1. 概念

        AMQPAdvanced Message Queuing Protocol是一个应用层的高级消息队列协议它与JMS的本质差别AMQP不从API层进行限定而是直接定义网络交换的数据格式。基于此协议不受客户端开发语言等条件的限制RabbitMQ就是基于此协议实现的。

2. 核心组件

  • ConnectionFactory连接工厂生产Connection的的工厂。
  • Connection连接应用程序与Broker的网络连接TCP/IP/三次握手和四次挥手。AMQP连接通常是长连接。
  • Channel网络信道大部分的业务操作是在Channel这个接口中完成的包括:
    • 队列的声明queueDeclare交换机的声明exchangeDeclare
    • 队列的绑定queueBind发布消息basicPublish消费消息basicConsume等。
  • Broker中间件接受客户端的连接实现AMQP实体服务如RabbitMQ。
  • Producer生产者生产消息。
  • Consumer消费者消费消息。
  • Queue队列存储着即将被应用消费掉的消息。
  • Message消息服务与应用程序之间传送的数据由Properties属性和body主体组成。
  • VirtualHost虚拟主机用于进行逻辑隔离一个虚拟主机理由可以有若干个Exchange和Queue同一个虚拟主机里面不能有相同名字的Exchange。
  • Exchange交换机接受消息根据路由键发送消息到绑定的队列不具备消息存储的能力。
  • Binding绑定Exchange和Queue之间的虚拟连接。
  • Routing Key路由键路由规则虚拟机可以用它来确定如何路由一个特定消息。

3. AMQP工作过程

  1. 生成者发布消息到交换机Exchange。
  2. 交换机根据路由规则将消息分发给与当前交换机绑定的队列中。
  3. 消费者监听接收到消息之后开始业务处理。

二、4种交换机的使用

        RabbitMQ中一供有四种交换机类型分别是Direct exchange直连交换机、Fanout exchange扇形交换机、Topic exchange主题交换机、Headers exchange头交换机。

1. Direct exchange直连交换机

        要求消息与一个特定的路由键完全匹配即一对一的点对点的发送。

2. Fanout exchange扇形交换机

        一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上即发布订阅。

3.Topic exchange主题交换机

        通配符匹配交换机使用通配符去匹配路由到对应的队列。

4. Headers exchange头交换机

        不适用routingKey进行路由匹配使用请求头中键值路由匹配。 

5. 代码实现以直连交换机为例

5.1 直连交换机的Rabbit配置类

/**
 * 〈一句话功能简述〉<br>
 * 〈直连交换机的Rabbit配置类〉
 *
 * @author hanxinghua
 * @create 2022/9/19
 * @since 1.0.0
 */
@Order(-1)
@Configuration
public class DirectRabbitConfig implements BeanPostProcessor {


    @Resource
    private RabbitAdmin rabbitAdmin;


    //初始化rabbitAdmin对象
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        // 只有设置为 truespring 才会加载 RabbitAdmin 这个类
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }

    @Bean
    public Queue rabbitDirectQueue() {
        /**
         * 1、name:    队列名称
         * 2、durable: 是否持久化
         * 3、exclusive: 是否独享、排外的。如果设置为true定义为排他队列。则只有创建者可以使用此队列。也就是private私有的。
         * 4、autoDelete: 是否自动删除。也就是临时队列。当最后一个消费者断开连接后会自动删除。
         * */
        return new Queue(RabbitConstant.DIRECT_TOPIC, true, false, false);
    }

    @Bean
    public DirectExchange rabbitDirectExchange() {
        // Direct交换机
        return new DirectExchange(RabbitConstant.DIRECT_EXCHANGE, true, false);
    }

    @Bean
    public Binding bindDirect() {
        //链式写法绑定交换机和队列并设置匹配键
        return BindingBuilder
                //绑定队列
                .bind(rabbitDirectQueue())
                //到交换机
                .to(rabbitDirectExchange())
                //并设置匹配键
                .with(RabbitConstant.DIRECT_ROUTING);
    }

    /**
     * 实例化Bean后的处理器
     * Tips
     * 由于队列不存在启动消费者会报错最好的解决方法是生产者和消费者都尝试声明队列
     *
     * @param bean
     * @param beanName
     * @return
     * @throws BeansException
     */
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {

        // 创建交换机
        rabbitAdmin.declareExchange(rabbitDirectExchange());
        // 创建队列
        rabbitAdmin.declareQueue(rabbitDirectQueue());
        return null;
    }

}

5.2 直连交换机的发送消息服务

/**
 * 〈一句话功能简述〉<br>
 * 〈〉
 *
 * @author hanxinghua
 * @create 2022/9/19
 * @since 1.0.0
 */
@Service("directRabbitService")
public class DirectRabbitServiceImpl implements RabbitService {

    private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Override
    public String sendMsg(String msg) throws Exception {
        try {
            rabbitTemplate.convertAndSend(RabbitConstant.DIRECT_EXCHANGE, RabbitConstant.DIRECT_ROUTING, msg);
            return "ok";
        } catch (Exception e) {
            e.printStackTrace();
            return "error";
        }
    }
}

5.3 直连交换机的消息消费者

/**
 * 〈一句话功能简述〉<br>
 * 〈直连交换机消费者〉
 *
 * @author hanxinghua
 * @create 2022/9/19
 * @since 1.0.0
 */
@Component
public class DirectRabbitConsumer {

    enum Action {
        //处理成功
        SUCCESS,
        //可以重试的错误消息重回队列
        RETRY,
        //无需重试的错误拒绝消息并从队列中删除
        REJECT
    }

    @RabbitHandler
    @RabbitListener(queuesToDeclare = @Queue(RabbitConstant.DIRECT_TOPIC))
    public void process(String msg, Message message, Channel channel) {
        long tag = message.getMessageProperties().getDeliveryTag();
        Action action = Action.SUCCESS;
        try {
            System.out.println("消费者RabbitDemoConsumer从RabbitMQ服务端消费消息" + msg);
            if ("bad".equals(msg)) {
                throw new IllegalArgumentException("测试抛出可重回队列的异常");
            }
            if ("error".equals(msg)) {
                throw new Exception("测试抛出无需重回队列的异常");
            }
        } catch (IllegalArgumentException e1) {
            e1.printStackTrace();
            //根据异常的类型判断设置action是可重试的还是无需重试的
            action = Action.RETRY;
        } catch (Exception e2) {
            //打印异常
            e2.printStackTrace();
            //根据异常的类型判断设置action是可重试的还是无需重试的
            action = Action.REJECT;
        } finally {
            try {
                if (action == Action.SUCCESS) {
                    //multiple 表示是否批量处理。true表示批量ack处理小于tag的所有消息。false则处理当前消息
                    channel.basicAck(tag, false);
                } else if (action == Action.RETRY) {
                    //Nack拒绝策略消息重回队列
                    channel.basicNack(tag, false, true);
                } else {
                    //Nack拒绝策略并且从队列中删除
                    channel.basicNack(tag, false, false);
                }
                channel.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

}

三、6种通信模型使用

        RabbitMQ中主要包括6种通信模型分别是helloworld模型、work模型、pubsub模型、router模型、topic模型、rpc模型。

1. helloworld模型

        一个生产者发送消息一个接收者接收消息。

        相关代码

/**
 * 〈一句话功能简述〉<br>
 * 〈接受队列中的消息〉
 * <p>
 * 一个生成者发送消息一个接收者接收消息
 *
 * @author hanxinghua
 * @create 2022/9/23
 * @since 1.0.0
 */
public class Recv {

    /**
     * 队列名称
     */
    private final static String QUEUE_NAME = "hello.mq";

    public static void main(String[] argv) throws Exception {

        // 创建链接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置RabbitMQ所在主机ip或者主机名
        factory.setHost("localhost");
        // 创建一个连接
        Connection connection = factory.newConnection();
        // 创建一个频道
        Channel channel = connection.createChannel();
        // 声明队列主要为了防止消息接收者先运行此程序队列还不存在时创建队列。
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println("Waiting for messages.");

        // 创建消费者
        Consumer consumer = new DefaultConsumer(channel) {

            /**
             * 处理交付
             *
             * @param consumerTag 这个消息的唯一标记
             * @param envelope 信封(请求的消息属性的一个封装)
             * @param properties 前面队列带过来的值
             * @param body 接受到的消息
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("Received '" + message + "'");
            }
        };

        // 启动一个消费者并返回服务端生成的消费者标识
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }

}




/**
 * 〈一句话功能简述〉<br>
 * 〈发送消息到队列中〉
 *
 * @author hanxinghua
 * @create 2022/9/23
 * @since 1.0.0
 */
public class Send {

    /**
     * 队列名称
     */
    private final static String QUEUE_NAME = "hello.mq";

    public static void main(String[] argv) throws Exception {

        // 创建链接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置RabbitMQ所在主机ip或者主机名
        factory.setHost("localhost");
        // 创建一个连接
        Connection connection = factory.newConnection();
        // 创建一个频道
        Channel channel = connection.createChannel();
        // 指定一个队列
        // 第一个参数队列名称
        // 第二个参数false重启后队列没有。true持久化队列重启后队列依然存在
        // 第三个参数声明一个独占队列仅限于此连接连接关闭删除这个队列 true
        // 第四个参数最后一个消费者退出去之后这个队列是否自动删除
        // 第五个参数队列的其他属性
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 往队列中发出一条消息
        String message = "hello world!";
        // 第一个参数: 交换机不能为null但是可以设置成 ""
        // 第二个参数路由key不能为null但是可以设置成 ""
        // 第三个参数设置的队列的属性
        // 第四个参数消息值
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println("Sent '" + message + "'");
        //关闭频道和连接
        channel.close();
        connection.close();
    }
}

2. work模型 

        多个消费者消费的数据之和才是原来队列中的所有数据适用于流量的消峰。

         相关代码

/**
 * 〈一句话功能简述〉<br>
 * 〈〉
 *
 * @author hanxinghua
 * @create 2022/9/23
 * @since 1.0.0
 */
public class Task {

    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

        for (int i = 0; i < 100 ; i++) {
            channel.basicPublish("", TASK_QUEUE_NAME, null,
                    ("我是工作模型:"+i).getBytes("UTF-8"));
        }
        System.out.println("Sent over!");

        channel.close();
        connection.close();
    }

}



/**
 * 〈一句话功能简述〉<br>
 * 〈〉
 *
 * @author hanxinghua
 * @create 2022/9/23
 * @since 1.0.0
 */
public class Worker1 {

    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println("Waiting for messages.");

//        // 消费端限流策略同一时刻服务器只会发送一条消息给消费者
//        channel.basicQos(1)

        final Consumer consumer = new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("消费者1接受到的消息是:"+new String(body));
                // 进行手动应答  第一个参数自动应答的这个消息标记  第二个参数false 就相当于告诉队列受到消息了
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
    }

}




/**
 * 〈一句话功能简述〉<br>
 * 〈〉
 *
 * @author hanxinghua
 * @create 2022/9/23
 * @since 1.0.0
 */
public class Worker2 {

    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println("Waiting for messages.");


        final Consumer consumer = new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("消费者2接受到的消息是:"+new String(body));
                // 进行手动应答  第一个参数自动应答的这个消息标记  第二个参数false 就相当于告诉队列受到消息了
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
    }
}

3. pubsub模型

        发布订阅模式使用Fanout交换机。

         相关代码

/**
 * 〈一句话功能简述〉<br>
 * 〈消息发布者〉
 *
 * @author hanxinghua
 * @create 2022/9/23
 * @since 1.0.0
 */
public class Publish {

    /**
     * 声明交换机的名字
     */
    private static final String EXCHANGE_NAME="fanout-01";

    public static void main(String[] args) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 声明交换机 第一个参数交换机的名字  第二个参数交换机的类型如果使用的是发布订阅模型  只能写 fanout
        channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.FANOUT);

        // 发送消息到交换机
        for (int i = 0; i <100 ; i++) {
            channel.basicPublish(EXCHANGE_NAME,"",null,("发布订阅模型的值:"+i).getBytes());
        }
        System.out.println("Sent over!");

        // 关闭资源
        channel.close();
        connection.close();
    }
}



/**
 * 〈一句话功能简述〉<br>
 * 〈消息订阅者〉
 *
 * @author hanxinghua
 * @create 2022/9/23
 * @since 1.0.0
 */
public class Subscribe1 {

    /**
     * 声明交换机的名字
     */
    private static final String EXCHANGE_NAME = "fanout-01";

    /**
     * 队列的名字
     */
    private static final String QUEUE_NAME = "fanout-queue1";


    public static void main(String[] args) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 声明换机
        channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.FANOUT);

        // 将队列绑定到交换机  第一个参数队列的名字 第二个参数交换机的名字
        // 第三个参数路由的key(现在没有用到这个路由的key)
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        System.out.println("Waiting for messages.");

        // 声明消费者
        Consumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("订阅者1接受到的数据是:" + new String(body));
            }
        };

        // 启动一个消费者
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    }
}



/**
 * 〈一句话功能简述〉<br>
 * 〈消息订阅者〉
 *
 * @author hanxinghua
 * @create 2022/9/23
 * @since 1.0.0
 */
public class Subscribe2 {

    /**
     * 声明交换机的名字
     */
    private static final String EXCHANGE_NAME = "fanout-01";

    /**
     * 队列的名字
     */
    private static final String QUEUE_NAME = "fanout-queue2";


    public static void main(String[] args) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 声明换机
        channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.FANOUT);

        // 将队列绑定到交换机  第一个参数队列的名字 第二个参数交换机的名字
        // 第三个参数路由的key(现在没有用到这个路由的key)
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        System.out.println("Waiting for messages.");

        // 声明消费者
        Consumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("订阅者1接受到的数据是:" + new String(body));
            }
        };

        // 启动一个消费者
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    }
}

4. router模型

        路由模型相当于是分布订阅的升级版根据路由的keyrouting key来判断是否路由到哪一个队列里面去使用Direct交换机

        相关代码

/**
 * 〈一句话功能简述〉<br>
 * 〈〉
 *
 * @author hanxinghua
 * @create 2022/9/23
 * @since 1.0.0
 */
public class Producer {

    private static final String EXCHANGE_NAME = "direct-01";

    public static void main(String[] args) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 声明一个交换机要是路由模式只能是 direct
        channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.DIRECT);

        // 发送信息到交换机
        for (int i = 0; i < 100; i++) {
            if (i % 2 == 0) {
                // 这个路由的key是可以随便设置的
                channel.basicPublish(EXCHANGE_NAME, "one", null, ("路由模型的值:" + i).getBytes());
            } else {
                // 这个路由的key是可以随便设置的
                channel.basicPublish(EXCHANGE_NAME, "two", null, ("路由模型的值:" + i).getBytes());
            }
        }
        System.out.println("Sent over!");

        channel.close();
        connection.close();
    }
}



/**
 * 〈一句话功能简述〉<br>
 * 〈〉
 *
 * @author hanxinghua
 * @create 2022/9/23
 * @since 1.0.0
 */
public class Consumer1 {

    private static final String EXCHANGE_NAME="direct-01";
    private static final String QUEUE_NAME="direct-queue-01";

    public static void main(String[] args) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.DIRECT);

        // 绑定队列到交换机  第三个参数表示的是路由key
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"one");
        System.out.println("Waiting for messages.");

        // 声明消费者
        Consumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //这里就是接受消息的地方
                System.out.println("路由key是one的这个队列接受到数据:"+new String(body));
            }
        };

        //绑定消费者
        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
    }
}



/**
 * 〈一句话功能简述〉<br>
 * 〈〉
 *
 * @author hanxinghua
 * @create 2022/9/23
 * @since 1.0.0
 */
public class Consumer2 {

    private static final String EXCHANGE_NAME="direct-01";
    private static final String QUEUE_NAME="direct-queue-02";

    public static void main(String[] args) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.DIRECT);

        // 绑定队列到交换机  第三个参数表示的是路由key
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"two");
        System.out.println("Waiting for messages.");

        // 声明消费者
        Consumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //这里就是接受消息的地方
                System.out.println("路由key是two的这个队列接受到数据:"+new String(body));
            }
        };

        //绑定消费者
        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
    }
}

5. topic模型

        相当于是对路由模式的一个升级在匹配的规则上可以实现模糊匹配

        相关代码

/**
 * 〈一句话功能简述〉<br>
 * 〈〉
 *
 * @author hanxinghua
 * @create 2022/9/23
 * @since 1.0.0
 */
public class Producer {

    private static final String EXCHANGE_NAME = "topic-01";

    public static void main(String[] args) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.TOPIC);

        // 发送信息到交换机
        for (int i = 0; i < 100; i++) {
            if (i % 2 == 0) {
                channel.basicPublish(EXCHANGE_NAME, "one.one.one", null, ("路由模型的值:" + i).getBytes());
            }else {
                channel.basicPublish(EXCHANGE_NAME, "one.one", null, ("路由模型的值:" + i).getBytes());
            }
        }
        System.out.println("Sent over!");

        channel.close();
        connection.close();
    }
}



/**
 * 〈一句话功能简述〉<br>
 * 〈〉
 *
 * @author hanxinghua
 * @create 2022/9/23
 * @since 1.0.0
 */
public class Consumer1 {

    private static final String QUEUE_NAME="topic-queue-01";
    private static final String EXCHANGE_NAME="topic-01";

    public static void main(String[] args) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");

        // 绑定队列到交换机  第三个参数表示的是路由key
        // 注意  * 只是代表一个单词  # 这个才代表  一个或者多个单词
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"one.*");
        System.out.println("Waiting for messages.");

        // 声明消费者
        Consumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("路由key是one.*的这个队列接受到数据:"+new String(body));
            }
        };
        System.out.println("Waiting for messages.");
        //绑定消费者
        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
    }
}



/**
 * 〈一句话功能简述〉<br>
 * 〈〉
 *
 * @author hanxinghua
 * @create 2022/9/23
 * @since 1.0.0
 */
public class Consumer2 {

    private static final String QUEUE_NAME="topic-queue-02";
    private static final String EXCHANGE_NAME="topic-01";

    public static void main(String[] args) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");

        // 绑定队列到交换机  第三个参数表示的是路由key
        // 注意  * 只是代表一个单词  # 这个才代表  一个或者多个单词
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"one.#");
        System.out.println("Waiting for messages.");

        // 声明消费者
        Consumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("路由key是one.#的这个队列接受到数据:"+new String(body));
            }
        };
        //绑定消费者
        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
    }
}

6. rpc模型

         相关代码

/**
 * 〈一句话功能简述〉<br>
 * 〈〉
 *
 * @author hanxinghua
 * @create 2022/9/23
 * @since 1.0.0
 */
public class Server {

    private final static String QUEUE_NAME = "rpc-01";

    public static void main(String[] args) throws Exception {
        // 创建链接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置RabbitMQ所在主机ip或者主机名
        factory.setHost("localhost");
        // 创建一个连接
        Connection connection = factory.newConnection();
        // 创建一个频道
        Channel channel = connection.createChannel();
        // 声明一个队列客户端向服务器发送数据的队列
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        // 启动消费者用来处理客户端发送到队列的消息
        channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 获取参数
                String message = new String(body);
                int n = Integer.parseInt(message);
                // 模拟服务端的一个功能
                String fib = handleInterface(n) + "";
                AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                        .Builder()
                        .correlationId(properties.getCorrelationId())
                        .build();

                // 将结果返回会客户端
                // 注意从properties去获取客户端传送过来的信息再返回回去
                channel.basicPublish("", properties.getReplyTo(), replyProps, fib.getBytes());
            }
        });
    }

    private static int handleInterface(int n) {
        if (n == 0) {
            return 0;
        }
        return n + 2;
    }
}



/**
 * 〈一句话功能简述〉<br>
 * 〈RPC模型〉
 *
 * @author hanxinghua
 * @create 2022/9/23
 * @since 1.0.0
 */
public class Client {

    private final static String QUEUE_NAME = "rpc-01";

    public static void main(String[] args) throws Exception {
        // 创建链接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置RabbitMQ所在主机ip或者主机名
        factory.setHost("localhost");
        // 创建一个连接
        Connection connection = factory.newConnection();
        // 创建一个频道
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        // 声明一个队列换了一种方式用于存储服务器返回到客户端的数据
        String replyQueueName = channel.queueDeclare().getQueue();

        // 使用UUID随机生成一个id
        final String correlationId = UUID.randomUUID().toString();

        // 客户端发送给服务器添加的额外属性
        AMQP.BasicProperties props = new AMQP.BasicProperties()
                .builder()
                .correlationId(correlationId)
                .replyTo(replyQueueName)
                .build();

        // 客户端将数据发送到发送队列
        channel.basicPublish("", QUEUE_NAME, props, "4".getBytes());

        // 启动消费者用来客户端从相应队列获取到处理的结果
        channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                // 通过correlationId去保证获取到的是正确的信息
                if (properties.getCorrelationId().equals(correlationId)) {
                    // 处理的结果输出
                    System.out.println("RPC返回结果:" + new String(body));
                }

                // 关闭通道注意一定要等结果返回后再关闭不然拿不到返回的数据
                try {
                    channel.close();
                    connection.close();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: RabbitMQ