Mall脚手架总结(四) —— SpringBoot整合RabbitMQ实现超时订单处理-CSDN博客

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

前言

        在电商项目中订单因为某种特殊情况被取消或者超时未支付都是比较常规的用户行为而实现该功能我们就要借助消息中间件来为我们维护这么一个消息队列。在mall脚手架中选择了RabbitMQ消息中间件接下来荔枝就会根据功能需求来梳理一下超时订单处理功能以及相应的背景知识。希望对正在学习的小伙伴有帮助~~~


文章目录

前言

一、整合RabbitMQ实现延时消息

1.1 RabbitMQ管理界面的VirtualHost

1.2 回顾枚举类的优点

1.3 划重点Spring AMQP框架 

1.3.1 AMQPTemplate

1.3.2 Message

1.3.3 @RabbitListener注解

1.4 订单超时未支付取消订单的流程

总结


一、整合RabbitMQ实现延时消息

1.1 RabbitMQ管理界面的VirtualHost

        Virtual Host虚拟主机相当于是一个个的相对独立的RabbitMQ服务器。每个虚拟主机都有自己独立的用户、权限、交换机exchange、队列queue和绑定关系。在RabbitMQ中每个连接到服务器的客户端都必须选择一个虚拟主机进行操作。如果客户端没有指定虚拟主机默认会使用/作为虚拟主机也就是RabbitMQ默认的虚拟主机。为保证隔离性这里声明了一个/mall的虚拟主机与我们默认的用户虚拟主机/隔离开来。

1.2 回顾枚举类的优点

枚举类Enum是一种特殊的数据库类型用于表示固定数量的命名常量枚举类定义了一个新的数据类型该类型可以包含一组预定义的值。

优点

  • 安全性保证方法参数的类型的安全性避免非法值的传入
  • 避免魔法数字 枚举类可以帮助你避免在代码中使用魔法数字magic numbers。魔法数字是指在代码中直接使用的、没有明确含义的数字。使用枚举类可以为这些常量提供有意义的名字。
  • 支持迭代器Iteration 枚举类提供了一种便捷的方式来遍历枚举值。

所以在脚手架中我们定义交换机名称、队列名称和routingKey就借助了枚举类。

/**
 * @auther lzddl
 * @description 消息队列枚举配置
 */
@Getter
public enum QueueEnum {
    /**
     * 消息通知队列
     */
    QUEUE_ORDER_CANCEL("mall.order.direct", "mall.order.cancel", "mall.order.cancel"),
    /**
     * 消息通知ttl队列
     */
    QUEUE_TTL_ORDER_CANCEL("mall.order.direct.ttl", "mall.order.cancel.ttl", "mall.order.cancel.ttl");

    /**
     * 交换机名称
     */
    private String exchange;
    /**
     * 队列名称
     */
    private String name;
    /**
     * 路由键
     */
    private String routeKey;

    QueueEnum(String exchange, String name, String routeKey) {
        this.exchange = exchange;
        this.name = name;
        this.routeKey = routeKey;
    }
}

1.3 划重点Spring AMQP框架 

        Spring AMQP是一个基于AMQP协议的强大的消息中间件框架它提供了一个简单的API来发送和接收异步、可靠的消息。AMQP是Spring框架的一部分可以与Spring Boot和其他Spring项目一起使用。Spring AMQP支持多种消息协议包括RabbitMQ、Apache ActiveMQ和Qpid等。它提供了一个高级的消息模型包括消息确认、事务和消息监听器等功能使得开发者可以轻松地编写可靠的消息应用程序。同时Spring AMQP还提供了一些高级特性如消息转换器、消息路由、消息过滤和消息拦截等。总的来说Spring AMQP 是对 Spring 基于 AMQP 的消息收发解决方案在SpringBoot项目中操作消息中间件RabbitMQ的相关操作的时候我们需要借助Spring提供的AMQP框架

AMQP高级消息队列协议是面向消息的中间件的平台中立的线级协议。

Spring AMQP的核心组件

  • ConnectionFactory连接工厂接口用于创建连接。

  • AmqpAdmin封装了RabbitMQ的基础管理操作比如对交换机、队列、绑定的声明和删除等。
  • MessageSpring AMQP 对消息的封装。两个重要的属性body消息内容messageProperties消息属性。
  • 消息模板AmqpTemplate 用来简化消息的收发支持消息的确认与返回。跟 JDBCTemplate一 样它封装了创建连接 、创建消息信道、收发消息、消息格式转换、关闭信道、关闭连接等等操作。

  • 消息监听Messager Listener Spring AMQP 异步消息投递的监听器接口它只有一个方法onMessage用于处理消息队列推送来的消息。

  • 转换器MessageConvertor用来处理消息对象的序列化和反序列化的操作工具它可以将消息对象转换为消息队列可以处理的格式并将接收到的消息转换为Java对象。

1.3.1 AMQPTemplate

AMQPTemplate是Spring AMQP框架提供的一个接口它定义了一系列用于发送和接收消息的方法。我们来看看源码并归类一下这些方法

public interface AmqpTemplate {
    void send(Message var1) throws AmqpException;
    void send(String var1, Message var2) throws AmqpException;
    void send(String var1, String var2, Message var3) throws AmqpException;
 

    void convertAndSend(Object var1) throws AmqpException;
    void convertAndSend(String var1, Object var2) throws AmqpException;
    void convertAndSend(String var1, String var2, Object var3) throws AmqpException;
    void convertAndSend(Object var1, MessagePostProcessor var2) throws AmqpException;
    void convertAndSend(String var1, Object var2, MessagePostProcessor var3) throws AmqpException;
    void convertAndSend(String var1, String var2, Object var3, MessagePostProcessor var4) throws AmqpException;


    @Nullable
    Message receive() throws AmqpException;
    @Nullable
    Message receive(String var1) throws AmqpException;
    @Nullable
    Message receive(long var1) throws AmqpException;
    @Nullable
    Message receive(String var1, long var2) throws AmqpException;


    @Nullable
    Object receiveAndConvert() throws AmqpException;
    @Nullable
    Object receiveAndConvert(String var1) throws AmqpException;
    @Nullable
    Object receiveAndConvert(long var1) throws AmqpException;
    @Nullable
    Object receiveAndConvert(String var1, long var2) throws AmqpException;
    @Nullable
    <T> T receiveAndConvert(ParameterizedTypeReference<T> var1) throws AmqpException;
    @Nullable
    <T> T receiveAndConvert(String var1, ParameterizedTypeReference<T> var2) throws AmqpException;
    @Nullable
    <T> T receiveAndConvert(long var1, ParameterizedTypeReference<T> var3) throws AmqpException;
    @Nullable
    <T> T receiveAndConvert(String var1, long var2, ParameterizedTypeReference<T> var4) throws AmqpException;


    <R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> var1) throws AmqpException;
    <R, S> boolean receiveAndReply(String var1, ReceiveAndReplyCallback<R, S> var2) throws AmqpException;
    <R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> var1, String var2, String var3) throws AmqpException;
    <R, S> boolean receiveAndReply(String var1, ReceiveAndReplyCallback<R, S> var2, String var3, String var4) throws AmqpException;
    <R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> var1, ReplyToAddressCallback<S> var2) throws AmqpException;
    <R, S> boolean receiveAndReply(String var1, ReceiveAndReplyCallback<R, S> var2, ReplyToAddressCallback<S> var3) throws AmqpException;


    @Nullable
    Message sendAndReceive(Message var1) throws AmqpException;
    @Nullable
    Message sendAndReceive(String var1, Message var2) throws AmqpException;
    @Nullable
    Message sendAndReceive(String var1, String var2, Message var3) throws AmqpException;
   

    @Nullable
    Object convertSendAndReceive(Object var1) throws AmqpException;
    @Nullable
    Object convertSendAndReceive(String var1, Object var2) throws AmqpException;
    @Nullable
    Object convertSendAndReceive(String var1, String var2, Object var3) throws AmqpException;
    @Nullable
    Object convertSendAndReceive(Object var1, MessagePostProcessor var2) throws AmqpException;
    @Nullable
    Object convertSendAndReceive(String var1, Object var2, MessagePostProcessor var3) throws AmqpException;
    @Nullable
    Object convertSendAndReceive(String var1, String var2, Object var3, MessagePostProcessor var4) throws AmqpException;


    @Nullable
    <T> T convertSendAndReceiveAsType(Object var1, ParameterizedTypeReference<T> var2) throws AmqpException;
    @Nullable
    <T> T convertSendAndReceiveAsType(String var1, Object var2, ParameterizedTypeReference<T> var3) throws AmqpException;
    @Nullable
    <T> T convertSendAndReceiveAsType(String var1, String var2, Object var3, ParameterizedTypeReference<T> var4) throws AmqpException;
    @Nullable
    <T> T convertSendAndReceiveAsType(Object var1, MessagePostProcessor var2, ParameterizedTypeReference<T> var3) throws AmqpException;
    @Nullable
    <T> T convertSendAndReceiveAsType(String var1, Object var2, MessagePostProcessor var3, ParameterizedTypeReference<T> var4) throws AmqpException;
    @Nullable
    <T> T convertSendAndReceiveAsType(String var1, String var2, Object var3, MessagePostProcessor var4, ParameterizedTypeReference<T> var5) throws AmqpException;
}

从源码中我们可以了解到其实该接口提供了八种类型的消息操作方法因为不同方法都采用了重载所以看起来有点吓人接下来我们大致根据方法名弄清楚这些方法的功能即可。

send()和receive()就不说了根据给出的参数来发送消息和接收消息

  • convertAndSend转化并发送消息
  • receiveAndConvert接收并转化消息
  • receiveAndReply接收并回复消息
  • sendAndReceive发送并接收消息
  • convertSendAndReceive转化发送和接收
  • convertSendAndReceiveAsType根据类型来发送接收

其实AMQPTemplate是一个比较抽象的接口其中操作RabbitMQ更为具体的接口的实现类是RabbitMQTemplate。而关于RabbitMQTemplate的源码这里就不展示了一千多行呢~

 //给延迟队列发送消息
amqpTemplate.convertAndSend(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange(), QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey(), orderId, new MessagePostProcessor() {
    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        //给消息设置延迟毫秒值
        message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
        return message;
    }
});

可以看到我们调用了RabbitMQTemplate实现的AMQPTemplate接口方法来发送消息。

1.3.2 Message

Message消息是服务器与应用程序之间传递的数据由Properties和Body组成 Properties可以对消息进行修饰如消息的优先级、传输格式如JSON、延迟等高级特性Body则就是消息体内容。

Message类的使用场景

  • 发送消息时通常使用最多的是编写消息体内容、设置过期时间、设置持久化发送消息的类型分为两种情况基本类型和对象 
  • 接收消息时实际是把二进制byte转为需要的类型再进行数据传递和业务处理接收消息的类型分为两种情况基本类型和对象

在脚手架中我们看到使用了Message来反取消息对象的属性并设置相应的消息过期时间

message.getMessageProperties().setExpiration(String.valueOf(delayTimes));

其余的核心组件具体内容可以参考这位大佬的博文 https://blog.csdn.net/weixin_45596022/article/details/113359009

1.3.3 @RabbitListener注解

        @RabbitListener注解是Spring AMQP框架提供的注解用于简化RabbitMQ消息消费者的创建。当你在方法上使用@RabbitListener注解时Spring会自动创建一个RabbitMQ消息监听器用于监听指定队列上的消息并在消息到达时调用被注解的方法来处理消息。

因此在脚手架中我们通过该注解来实现死信队列消费者的创建

@Component
@RabbitListener(queues = "mall.order.cancel")
public class CancelOrderReceiver {
    private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderReceiver.class);
    @Autowired
    private OmsPortalOrderService portalOrderService;
    @RabbitHandler
    public void handle(Long orderId){
        LOGGER.info("receive delay message orderId:{}",orderId);
        portalOrderService.cancelOrder(orderId);
    }
}

1.4 订单超时未支付取消订单的流程

        在mall脚手架中模拟了一个订单到期未支付取消下单的操作首先用户在购买之后会创建订单随之可能会有锁定库存、判断会员身份或者积分、优惠券等操作Controller层中的generateOrder()就会创建一个带有过期时间的延时消息这部分是通过一个之前已经定义好了的CancelOrderSender也就是延时消息的发送者类发送延时消息到死信队列中。这里我们来看一下配置死信队列的配置类方法

    /**
     * 订单延迟队列死信队列
     */
    @Bean
    public Queue orderTtlQueue() {
        return QueueBuilder
                .durable(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getName())
                .withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_ORDER_CANCEL.getExchange())//到期后转发的交换机
                .withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey())//到期后转发的路由键
                .build();
    }

        由于我们定义了一个死信队列和一个延时队列订单下单之后我们就会把消息发送到死信队列由于用户迟迟不支付所以死信队列中的消息一直没有被消费等到TTL时间一到就会转发到普通队列中被消费者消费。前面在讲@RabbitListener注解的时候已经给出了消费者的demo消费者监听的就是普通队列。当消费者消费后会触发取消订单的API进行订单取消的操作释放库存、扣除优惠券或积分等操作


总结

        RabbitMQ整合进脚手架的功能还是比较简单的哈哈当然了脚手架只是为了让我们了解一些基础知识以便于快速上手项目重点的还是要学习有关AMQP的操作以及相应的在RabbitMQ对应的接口实现类。梳理完后接下来的文章荔枝就可以开始整合MinIO了一起加油吧~

今朝已然成为过去明日依然向往未来我是荔枝在技术成长之路上与您相伴~~~

如果博文对您有帮助的话可以给荔枝一键三连嘿您的支持和鼓励是荔枝最大的动力

如果博文内容有误也欢迎各位大佬在下方评论区批评指正

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: SpringRabbitMQ