Springboot整合RabbitMQ并使用

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

1、Springboot整合RabbitMQ

1、引入场景启动器

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

引入AMQP场景启动器之后RabbitAutoConfiguration就会自动生效。然后会给容器中自动配置了RabbitTemplateAmqpAdminCachingConnectionFactoryRabbitMessagingTemplate等来方便使用AMQP。

2、在yml中配置spring.rabbitmq相关信息

spring:
  rabbitmq:
    host: 192.168.56.10
    port: 5672
    username: admin
    password: admin
    virtual-host: my_vhost

2、简单使用

2.1 创建交换机(Exchange)、队列(Queue)和建立绑定关系(Binding)

@SpringBootTest
public class AmqpAdminTest {

    @Autowired
    private AmqpAdmin amqpAdmin;
    /**
     * 1、如何创建Exchange、Queue、Binding
     *      1、使用AmqpAdmin进行创建
     */
    @Test
    public void creatExchange() {
        //创建 名为 itcxc.java.direct 的交换机
        DirectExchange directExchange = new DirectExchange("itcxc.java.direct");
        amqpAdmin.declareExchange(directExchange);
    }

    @Test
    public void creatQueue() {
        //创建名为 itcxc.java 的队列
        Queue queue = new Queue("itcxc.java");
        amqpAdmin.declareQueue(queue);
    }

    @Test
    public void creatBinding() {
        //创建绑定关系 将队列itcxc.java绑定到交换机itcxc.java.directroutingKey为itcxc.java
        Binding binding = new Binding("itcxc.java", Binding.DestinationType.QUEUE,
                "itcxc.java.direct","itcxc.java",null);
        amqpAdmin.declareBinding(binding);
    }
}

2.1.1 交换机类型

在这里插入图片描述
direct会将消息发送给路由键必须完全匹配的队列中。
fanout会将消息发送给所有绑定的队列中不管路由键是否匹配。
topic主体模式其实就是在路由模式的基础上支持了对key的通配符匹配星号以及井号以满足更加复杂的消息分发场景。#匹配零个或者多个单词*匹配一个单词每个单词用.分割

2.2 发送消息

@SpringBootTest
public class RabbitTemplateTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     *  2、如何发消息
     *      1)、使用rabbitTemplate发送消息
     */
    @Test
    public void sendMessageTest(){
        OrderReturnReasonEntity orderReturnReasonEntity = new OrderReturnReasonEntity();
        orderReturnReasonEntity.setId(1L);
        orderReturnReasonEntity.setName("哈哈");
        orderReturnReasonEntity.setCreateTime(new Date());
        //1、发送消息
        // 默认情况下如果发送的消息是一个对象我们会使用序列化机制将对象写出去对象必须实现Serializable接口
        // 但是我们可以通过向容器中注入Jackson2JsonMessageConverter转换器将序列化机制改为转JSON
        rabbitTemplate.convertAndSend("itcxc.java.direct","itcxc.java", orderReturnReasonEntity);
    }
}

2.2.1 替换消息系列化方式

通过观看RabbitTemplate的源码发现我们在默认情况下消息系列化方式是JDK序列化方式。那么我们发送的消息如果是一个对象时这个对象就必须实现Serializable接口。
在这里插入图片描述
在这里插入图片描述

如何使用转JSON的方式序列化消息呢

通过观察RabbitAutoConfiguration源码发现在创建RabbitTemplate的时候会从容器中拿消息序列化器(MessageConverter)。

在这里插入图片描述

所以我们想要将转JSON的方式序列化消息只需要给容器中放一个Jackson2JsonMessageConverter就可以了

@Component
public class GulimallRabbitMqConfig {

    /**
     * 消息转换器 指定消息转换的方式为转为JSON
     * @return
     */
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }

}

2.3 获取消息

2.3.1 在启动类或者配置类上添加@EnableRabbit注解

使用@RabbitListener必须开启@EnableRabbit如果没有使用@RabbitListener可以不添加@EnableRabbit注解。

2.3.2 添加@RabbitListener

@RabbitListener可以标注在类和方法上 (监听哪些队列)
@RabbitHandler只能标注在方法上 (重载区别不同的消息)

@Service
public class RabbitListeners {

    /**
     * queues声明需要监听的所有队列
     *
     * 接收参数的类型
     * 1、org.springframework.amqp.core.Message
     * 2、直接写原来发送的消息类型
     * 3、Channel 当前传送数据的通道
     *
     * @param message
     */
    @RabbitListener(queues = {"itcxc.java"})
    public void receiveMessage(Message message, OrderReturnReasonEntity orderReturnReasonEntity,
                               Channel channel){
        System.out.println("接收到的消息为" + orderReturnReasonEntity);
    }
}

现在有一个情况就是我们给同一个消息对象发送的消息是有可能不是同一个类型的。例如

@Test
public void sendMq(){
    for (int i = 0; i < 10; i++) {
        if (i % 2 == 0){
            OrderReturnReasonEntity orderReturnReasonEntity = new OrderReturnReasonEntity();
            orderReturnReasonEntity.setId(1L);
            orderReturnReasonEntity.setName("哈哈");
            orderReturnReasonEntity.setCreateTime(new Date());
            rabbitTemplate.convertAndSend("itcxc.java.direct","itcxc.java", orderReturnReasonEntity);
        } else {
            OrderEntity orderEntity = new OrderEntity();
            orderEntity.setOrderSn(UUID.randomUUID().toString());
            rabbitTemplate.convertAndSend("itcxc.java.direct","itcxc.java", orderEntity);
        }
    }
}

在这个情况下如果我们还是用原来的方式监听消息的话就会使发送的消息类型为OrderEntity的消息丢失。

在这里插入图片描述
这个时候我们就可以将@RabbitListener标注在类上然后@RabbitHandler标注在方法上

@Service
@RabbitListener(queues = {"itcxc.java"})
public class RabbitListeners {

    /**
     * queues声明需要监听的所有队列
     *
     * 接收参数的类型
     * 1、org.springframework.amqp.core.Message
     * 2、直接写原来发送的消息类型
     * 3、Channel 当前传送数据的通道
     *
     * @param message
     */
    @RabbitHandler
    public void receiveMessage(Message message, OrderReturnReasonEntity orderReturnReasonEntity,
                               Channel channel){
        System.out.println("接收到的消息为" + orderReturnReasonEntity);
    }

    @RabbitHandler
    public void receiveMessage(OrderEntity orderEntity){
        System.out.println("接收到的消息为" + orderEntity);
    }
}

在这里插入图片描述

3、消息的可靠传递

3.1 发送端确认

为什么会丢失消息

  • 在发送消息到服务端的时候有可能因为网络等等问题没有将消息发送到服务端。
  • 交换机(Exchange)通过路由键将消息发送给队列(Queue)的时候有可能没有找到相应的队列(Queue)而默认情况下是将消息直接丢弃的。

3.1.1 开启confirm和return机制

spring:
  rabbitmq:
    # 消息发送到broker后的回调 
    publisher-confirm-type: correlated
    # 没有设置mandatory时生效
    publisher-returns: true
    # mandatory的优先级高于publisher-returns只要设置了mandatorypublisher-returns就失效了
    template:
      mandatory: true

我翻看源码可以发现mandatory的优先级高于publisher-returns只要设置了mandatorypublisher-returns就失效了。

在这里插入图片描述
在这里插入图片描述
但是经过测试我发现mandatory只有在publisher-confirm-typepublisher-returns至少有一个设置才会生效。如果mandatorypublisher-returns同时存在的话则mandatory优先级高于publisher-returns

3.1.2 添加回调方法

@Component
@RequiredArgsConstructor
public class RabbitMqCallback {

    private final RabbitTemplate rabbitTemplate;

    /**
     * RabbitMqCallback 创建完成之后执行这个方法
     * @return
     */
    @PostConstruct
    public RabbitTemplate initCallback() {

        /**
         * 需要设置spring.rabbitmq.publisher-confirm-type=correlated
         * 消息发broker成功回调发送到broker的exchange是否正确找到
         * correlationData当前消息的唯一关联数据(这个是消息的唯一ID)
         * ack消息是否发送成功
         * cause失败的原因成功则返回null
         */
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            System.out.println("setConfirmCallback 消息数据" + correlationData);
            if (Objects.nonNull(correlationData)) {
                System.out.println("setConfirmCallback 消息数据" + correlationData.getReturnedMessage());
            }
            System.out.println("setConfirmCallback 消息确认" + ack);
            System.out.println("setConfirmCallback 原因" + cause);
            System.out.println("-----------------------------------");
        });

        /**
         * 需要设置spring.rabbitmq.template.mandatory=true或spring.rabbitmq.publisher-returns=true 才会有回调
         * 消息路由回调从交换器路由到队列是否正确发送
         * message投递失败消息的详细消息
         * replyCode回应码
         * replyText回应信息
         * exchange当时这个消息发送给的交换器
         * routingKey当时这个消息发送用的路由键
         */
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            System.out.println("setReturnCallback 消息" + message);
            System.out.println("setReturnCallback 回应码" + replyCode);
            System.out.println("setReturnCallback 回应信息" + replyText);
            System.out.println("setReturnCallback 交换器" + exchange);
            System.out.println("setReturnCallback 路由键" + routingKey);
            System.out.println("-----------------------------------");
        });

        return rabbitTemplate;
    }
}

这样我们就可以知道哪些消息发送成功哪些消息发送失败了然后就可以做出相应的处理。

3.2 消费端确认

为什么会丢失消息

默认情况下只要收到消息客户端会自动确认然后服务端就会移除这个消息。由于客户端会一次性接收很多的消息。
在这个情况下就有可能我们接收了10个消息只处理了前面2个消息然后服务宕机了这样就会使得我们有8个消息丢失。

3.2.1 设置ACK应答机制为手动

spring:
  rabbitmq:
    # 设置ACK应答机制为手动
    listener:
      simple:
        acknowledge-mode: manual

手动模式只要我们没有明确的告诉MQ消息被签收(没有ACK)消息就是一直处于unacked状态即使客户端宕机了消息也不会丢失会重新变为ready下次有新的客户端连接进来就发给新的客户端。

3.2.2 处理完消息之后手动应答

 @RabbitListener(queues = {"itcxc.java"})
 public void receiveMessage(Message message, OrderReturnReasonEntity orderReturnReasonEntity,
                            Channel channel) throws IOException {
     //deliveryTag通道(channel)内自增的
     long deliveryTag = message.getMessageProperties().getDeliveryTag();
     System.out.println("方法一deliveryTag为"+deliveryTag+"接收到的消息为" + orderReturnReasonEntity);
     //确认签收参数说明deliveryTag,是否批量签收
     channel.basicAck(deliveryTag,false);
     //拒绝签收参数说明deliveryTag,是否批量签收,是否放回队列中
     //channel.basicNack(deliveryTag,false,true);
 }
阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: SpringRabbitMQ