Sentinel + Redis + Mysql + RabbitMQ 秒杀功能设计及后端代码实现

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

文章目录

前言

在开发秒杀系统功能的时候需要考虑但不限于以下几点
  1. 确保数据一致性
  2. 确保系统高性能
  3. 处理高并发场景
实际上对于不同的秒杀业务场景需要考虑的问题也会有不同的解决方案。

数据一致性

  秒杀系统的数据一致性其中一方面体现在库存数量的计算上我们不仅要确保商品尽可能地卖光还要确保生成的最终订单数量不能超过预设的库存值否则就会出现超卖的情况这也是我们整个秒杀服务最基本的要求。
  为了防止超卖我们可以在每次生成订单前查询当前秒杀商品的剩余库存库存不足不允许生成订单。
  另一方面当我们在进行扣减库存的操作时不同的扣减方案也会影响整个秒杀功能的实际表现。正常情况下购买商品分为两个步骤下单、付款。扣款方案大致分为三种情况。

  • 下单后扣减库存 下单后立即扣减库存会最大限度地减少服务器的压力因为最多只有有限个下单成功的用户。但是这种模式有一个弊端因为我们无法保证下单成功的用户最后都会去付款所以可能出现部分用户一直不付款但是却一直占用资源的情况导致商品没办法全部卖出去。
  • 付款后扣减库存 只要用户不付款库存就一直在会保证商品能全部卖出去。但是这种模式同样有一个弊端如果付款订单的数量没有达到库存上限那么所有的下单请求都会成功会导致在某一时刻下单成功的信息激增然后绝大部分用户付款时由于库存不足导致无法付款成功从而给用户带来不好的体验。
  • 下单后预扣减库存即下单后保留下单信息一段时间如果没有付款就释放库存并将下单信息置为失效虽然没有解决什么实质上的问题但是作为以上两种情况的折中方案中和了两者的优势和弊端目前大部分下单网购都是使用的这种方式。

后面的DEMO里面使用的付款后扣减库存的方案秒杀系统嘛抢不到很正常。

高性能

动静分离

  秒杀系统的页面内容一般来说是不会有变化的我们大可不必每次刷新页面就去请求诸多后端接口。
  所以我们需要先要理清楚哪些信息是可以固定不变的哪些信息是必须要后台提供的。就比如说针对某个商品的秒杀活动这个商品的价格、产品介绍、优惠信息一般是不会改变的所以这些信息我们可以直接设置为页面上写死的数据减少对后端服务的请求次数。

静态资源缓存

  推荐使用 CDN 内容分发网络 CDN 会将数据从源服务器复制到其他服务器上。当用户访问时CDN 会根据自己的负载均衡选择一个最优服务器然后用户在这个最优服务器上访问内容如果该服务器上没有目标资源则会进行回源从源服务器获取信息。
  因为我对 CDN 的原理也不是很了解只了解可以提高静态页面的访问速度所以这里不做过多的说明。

流控

  为了缓解秒杀时刻的巨大访问量带给服务器的压力我们可以在处理请求之前就适当的筛掉部分请求即进行流控降级比如一秒内如果有10000个请求同时命中服务器那我们只允许其中1000个请求能够进入真正的业务逻辑中剩下的请求会返回一个降级响应这个降级响应一般来说都是直接返回业务响应失败或者返回诸如“服务忙请重试”的提醒。
  Sentinel 提供了这样的功能Sentinel允许我们为指定的接口设置流控规则我们可以通过 QPS 或者并发线程数设置阈值使用QPS的话可以控制每秒最多做出多少有效响应而使用并发线程数则会控制系统线程数量不超过预设值。
   在这里插入图片描述

缓存数据库

  提高系统性能必然少不了缓存数据库的帮助现在最常用之一的缓存数据库 Redis 就很适合这种秒杀场景Redis 中的多路复用技术以及在内存上操作数据的设计造就了 Redis 的高吞吐量使得它能够在短时间之内响应更多的查询。

消息队列

  在业务开发中有一些逻辑我们往往不需要及时完成我们只需要关注最核心的业务部分逻辑可以推迟执行。
  我们或许可以考虑使用多线程处理但是使用多线程可能会存在以下问题
  1、线程仍旧是在当前进程之内执行的虽然加快了接口的响应速度但是服务器的负压还是一样的并没有实际为服务器减少
什么压力。
  2、如果逻辑中存在对持久数据库的操作使用多线程可能在某一时刻会有大量的写操作涌入数据库数据库的并发能力是相对比较弱的过多的写请求可能导致数据库宕机。
所以我们可以考虑消息队列这里使用的是 RabbitMQ 。

RabbitMQ的优点

  • 异步执行可以提高接口响应速度由不同的进程服务去消费消息处理剩余工作减轻当前服务器的压力。
  • 削峰填谷比如设置每秒最大的消息处理量为1000请求过多的时候会控制在每秒处理1000个消息其余消息就会积压在 RabbitMQ 的队列中这就是削峰。高峰期过后请求量下降但是这个时候由于存在高峰期积攒的消息所以在短时间内它仍能够提供一定的量级的消息这就是填谷。这样可以缓解在请求高峰期带给数据库的压力避免数据库崩溃。

高并发

分布式锁

  在微服务项目中一般会提供多个服务实例其中一个服务修改库存剩余值的时候其他任何服务进程都不允许读写该值这个时候我们往往需要用到分布式锁去处理。
  因为我们用到 Redis 做缓存数据库所以我们可以使用 Redis 实现分布式锁。 Redission 提供了操作 Redis 和获取分布式锁的便捷方法。

后端代码实现

这里实现的是一个简单的秒杀系统采用的是付款后扣减库存的方案以保证商品尽可能的卖出去而且限制每人只能购买一次。
该DEMO功能还不够完善下面的代码不是完整代码完整代码链接附在最后。

中间件

  • Mysql不管怎么样我们的数据最终是需要落库的持久化数据库是必须的。
  • Reids作为缓存服务器秒杀所需的基本信息都存在 Redis 中。
  • RabbitMQ逻辑中对持久化数据库的操作以及其他不重要的逻辑处理会影响接口的响应速度所以使用 RabbitMQ 去通知另外的服务进程操作。
  • 定时任务用于将我们设置的秒杀信息存入到 Redis 中可以采用定时任务框架例如 elastic-job、xxl-job等这里使用 springframework 自带的定时任务框架。

表结构

-- 秒杀信息表
CREATE TABLE `seckill` (
  `id` int NOT NULL AUTO_INCREMENT,
  `product_id` int DEFAULT NULL COMMENT '商品ID',
  `count` int DEFAULT '0' COMMENT '秒杀库存',
  `seckill_price` decimal(10,2) DEFAULT NULL COMMENT '秒杀价格',
  `start_time` datetime DEFAULT NULL COMMENT '秒杀开始时间',
  `end_time` datetime DEFAULT NULL COMMENT '秒杀结束时间',
  `is_cached` tinyint DEFAULT '0' COMMENT '是否放入Redis缓存 0 否 1 是',
  PRIMARY KEY (`id`) USING BTREE,
  UNIQUE KEY `UK_PRODUCT_CODE` (`product_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
-- 订单表
CREATE TABLE `seckill_orders` (
  `id` int NOT NULL AUTO_INCREMENT COMMENT 'id',
  `order_no` varchar(32) DEFAULT NULL COMMENT '订单号',
  `account_id` int DEFAULT NULL COMMENT '账户ID',
  `seckill_id` int DEFAULT NULL COMMENT '秒杀ID',
  `count` int DEFAULT NULL COMMENT '秒杀数量',
  `payment_amount` decimal(10,0) DEFAULT NULL COMMENT '应付金额',
  `checkout_time` datetime DEFAULT NULL COMMENT '下单时间',
  `status` tinyint DEFAULT '0' COMMENT '状态 0待支付 1已支付 2已取消',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=176 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

添加依赖

这里只放了部分依赖全部依赖信息比较多可以查看最后的源码链接
配置文件都是比较基础的单机配置有些甚至是默认配置 比如说 RabbitMQ这里就不贴了

<!-- Sentinel 配置中心 -->
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
    <version>2021.0.4.0</version>
</dependency>
<!-- Redis -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <version>2.6.6</version>
</dependency>
<!-- Redission -->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.9.1</version>
</dependency>
<!-- Mysql -->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.47</version>
</dependency>
<!-- MybatisPlus -->
<dependency>
    <groupId>com.baomidou</groupId>
    <artifactId>mybatis-plus-boot-starter</artifactId>
    <version>3.1.1</version>
</dependency>
<dependency>
	 <groupId>com.baomidou</groupId>
	 <artifactId>mybatis-plus-core</artifactId>
	 <version>3.1.1</version>
	 <scope>compile</scope>
</dependency>
<!-- RabbitMQ -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>1.5.2.RELEASE</version>
</dependency>

公共常量

主要是用于查询 Redis 信息或者获取分布式锁时的前缀可以放在公共常量类中。

public class CommonConst {
    public static String SECKILL_LOCK_USER = "SECKILL_USER_LOCK:";//用户个人锁
    public static String SECKILL_LOCK_GLOBAL = "SECKILL_GLOBAL_LOCK:";//全局锁
    public static String SECKILL_START_TIMESTEMP = "SECKILL_START_TIMESTEMP:";//秒杀开始时间
    public static String SECKILL_STOP_TIMESTEMP = "SECKILL_STOP_TIMESTEMP:";//秒杀结束时间
    public static String SECKILL_REMAIN_COUNT = "SECKILL_REMAIN_COUNT:";//秒杀商品剩余数量
    public static String SECKILL_ORDER_USERS = "SECKILL_ORDER_USERS:";//下单成功用户列表
    public static String SECKILL_SUCCEED_USERS = "SECKILL_SUCCEED_USERS:";//付款成功用户列表
}

实体类

@Data
public class Seckill {
	//对应数据库表 seckill 字段
}
@Data
public class SeckillOrders implements Serializable {
	//对应数据库表 seckill_orders 字段
}

Redission配置

这里使用 Redisssion 去操作 Redis 因为它提供简单的锁操作。

@Configuration
public class RedissonConfig {
    @Bean
    public RedissonClient redissonClient() {
        // 配置信息
        Config config = new Config();
        //地址、密码
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");//                .setPassword("pwd");
        return Redisson.create(config);
    }
}

定时任务

这里做了一个简单的定时任务每隔五秒执行一次将没有缓存过的秒杀信息放入 Redis

@Component
@EnableScheduling
public class SeckillTask {
    @Autowired
    SeckillDao seckillDao;
    @Autowired
    RedissonClient redission;

    /**
     * 5分钟执行一次
     * 查询没放进缓存中的秒杀任务并放入缓存
     */
    @Scheduled(cron="0/5 * * * * ?")
    public void seckillRedisCache() {
        //将秒杀信息缓存进REDIS
        QueryWrapper<Seckill> ew = new QueryWrapper<>();
        ew.eq("is_cached",0);
        List<Seckill> seckills = seckillDao.selectList(ew);

        if(CollectionUtils.isNotEmpty(seckills)){
            List<Long> cachedIds = new ArrayList<>();
            for (Seckill seckill : seckills) {
                RBucket<Integer> count = redission.getBucket(CommonConst.SECKILL_REMAIN_COUNT + seckill.getId());
                //先判断下确实没有缓存过
                if(!count.isExists()){
                    count.set(seckill.getCount());
                    redission.getBucket(CommonConst.SECKILL_START_TIMESTEMP + seckill.getId()).set(seckill.getStartTime());
                    redission.getBucket(CommonConst.SECKILL_STOP_TIMESTEMP + seckill.getId()).set(seckill.getEndTime());
                }
                cachedIds.add(seckill.getId());
            }

            //修改缓存状态
            if(CollectionUtils.isNotEmpty(cachedIds)) {
                UpdateWrapper<Seckill> uw = new UpdateWrapper<>();
                uw.setSql("is_cached = 1");
                uw.in("id", cachedIds);
                seckillDao.update(null,uw);
            }
        }
    }
}

Controller

增加 Sentinel 限流设置被限流的接口返回 “活动火爆请重新尝试!” 的降级响应。
需要注意的几点是
1、除了 BlockException 这个参数外降级的方法其他参数类型需要和被流控的接口参数类型保持一致
2、如果是通过 Sentinel 控制台去设置流控规则程序启动后会发现在没有任何资源这个时候我们只需要调用一次被流控的接口就好了然后就可以为其添加流控规则。
调用前
在这里插入图片描述
调用后
在这里插入图片描述

@Autowired
private ISeckillService seckillService;

/**
 * 流控降级
 * */
public R<Orders> seckillFallback(Long accountId,Long pid,BlockException ex) {
    return R.failure("活动火爆请重新尝试");
}

//秒杀接口通过Sentinel限流
@PostMapping("/seckill/{accountId}/{pid}")
@SentinelResource(value="seckill",blockHandler = "seckillFallback")
public R seckill(@PathVariable("accountId") Long accountId,@PathVariable("pid") Long pid) throws Exception {
    return seckillService.seckill(accountId,pid);
}

//支付接口
@PostMapping("/killpay/{seckillOrder}")
public R killpay(@PathVariable("seckillOrder") String seckillOrder) throws Exception {
    return seckillService.killpay(seckillOrder);
}

下单接口

下单之前我们要判断活动是否开始、是否已经结束、是否已经抢购成功过、是否已有下单记录、是否还有库存等。
同时我们要处理同一用户出现的并发现象因为过快的请求可能是非正常情况我们可以设法拒绝一些非正常请求的继续访问。

public R seckill(Long accountId,Long kid) throws InterruptedException {
        //获取活动开始时间
        RBucket<Date> startTime = redission.getBucket(CommonConst.SECKILL_START_TIMESTEMP + kid);
        if(!startTime.isExists() || new Date().compareTo(startTime.get())<0) {//获取不到表示活动还未开始
            return R.failure("活动未开始!");
        }

        //获取活动结束时间
        RBucket<Date> stopTime = redission.getBucket(CommonConst.SECKILL_STOP_TIMESTEMP + kid);
        if(new Date().compareTo(stopTime.get())>0){//判断活动是否结束
            return R.failure("活动已结束!");
        }

        //获取用户个人锁处理同一用户同时多次请求秒杀接口
        //采用自动释放锁的方式500ms后自动释放500ms内统一用户的请求视为非正常请求
        RLock lock = redission.getLock(CommonConst.SECKILL_LOCK_USER + kid + ":" + accountId);
        boolean locked = lock.tryLock(0,500,TimeUnit.MILLISECONDS);
        if(locked){

            //判断是否已经购买成功过
            RBucket<Set> succedUsers = redission.getBucket(CommonConst.SECKILL_SUCCEED_USERS + kid);
            if(succedUsers.isExists() && succedUsers.get().contains(accountId)){
                return R.failure("抢购次数已用尽!");
            }

            //判断是否有下单记录
            RBucket<Set> checkoutUsers = redission.getBucket(CommonConst.SECKILL_ORDER_USERS + kid);
            if(checkoutUsers.isExists() && checkoutUsers.get().contains(accountId)){
                return R.failure("已有下单记录请前往支付!");
            }

			//判断是否还有库存(下单时做初步判断防止没有库存了仍旧能下单。)
            RAtomicLong count = redission.getAtomicLong(CommonConst.SECKILL_REMAIN_COUNT + kid);
            if(!count.isExists() || count.get()<=0) {
                return R.failure("已售罄!");
            }

            //写入下单成功的人员列表 操作时要获取锁避免其他进程读取或者操作
            RLock gwlock = redission.getLock(CommonConst.SECKILL_LOCK_GLOBAL + kid );
            if(gwlock.tryLock(1000, TimeUnit.MILLISECONDS)) {
                Set<Long> newUsers = new HashSet<>();
                if(checkoutUsers.isExists()){
                    newUsers = checkoutUsers.get();
                    newUsers.add(accountId);
                }else{
                    newUsers.add(accountId);
                }
                checkoutUsers.set(newUsers);
                //释放写锁
                gwlock.unlock();

                String secOrder = UUID.randomUUID().toString().replace("-","");//返回订单标志
                //生成下单所需基本信息例如账户、秒杀ID
                SeckillOrders checkout = new SeckillOrders();
                checkout.setOrderNo(secOrder);
                checkout.setAccountId(accountId);
                checkout.setSeckillId(kid);
                checkout.setCount(1);
                checkout.setRabbitMqType(0);
                //放进消息队列中处理 我这里交换器 my-mq-exchange_A 绑定的是队列 QUEUE_A 绑定的路由键是 spring-boot-routingKey_A
                RabbitPublishUtils.sendDirectMessage(RabbitMQExchanges.EXCHANGE_A, RabbitMQRoutingKeys.ROUTINGKEY_A, checkout);

                // 最后返回下单单号供前端刷新界面使用
                return R.success(secOrder,"下单成功");
            }else{
               return R.failure("活动火爆请重新尝试!");
            }
        }else{
            return R.failure("操作频繁!");
        }
    }

付款接口

主要涉及库存校验、库存的扣减和恢复操作。

    public R killpay(String seckillOrder) throws InterruptedException {
        //订单信息从数据库中查询 防篡改
        QueryWrapper<SeckillOrders> queryWrapper = new QueryWrapper<>();
        queryWrapper.eq("order_no",seckillOrder);
        SeckillOrders checkout = ordersDao.selectOne(queryWrapper);
        Long kid = checkout.getSeckillId();

        RBucket<Set> succedUsers = redission.getBucket(SECKILL_SUCCEED_USERS + kid);

        //库存先扣减1 写锁控制下不让其他进程读取和修改库存值
        RLock glock = redission.getLock(CommonConst.SECKILL_LOCK_GLOBAL + kid );
        if (glock.tryLock(1000, TimeUnit.MILLISECONDS)) {
            RAtomicLong count = redission.getAtomicLong(SECKILL_REMAIN_COUNT + kid);
            if (count.isExists() && count.get() > 0) {
                count.getAndDecrement();
            }else{
                glock.unlock();
                return R.failure("已售罄!");
            }

            //添加到购买成功的人员列表中
            Set<Long> newUsers = new HashSet<>();
            if(succedUsers.isExists()){
                newUsers = succedUsers.get();
                newUsers.add(checkout.getAccountId());
            }else{
                newUsers.add(checkout.getAccountId());
            }
            succedUsers.set(newUsers);
			//释放写锁
            glock.unlock();
        }

        //扣减数据库中的库存交由消息队列中去处理
        checkout.setRabbitMqType(1);
        RabbitPublishUtils.sendDirectMessage(RabbitMQExchanges.EXCHANGE_A, RabbitMQRoutingKeys.ROUTINGKEY_A, checkout);

        try {
            //TODO 调用支付接口
            //模拟支付失败的操作 用来查看库存恢复是否正常
//            int sd = 0;
//            Object sds = 10/sd;
        }catch (Exception ex){

            //支付失败 恢复商品数量 可以交由消息队列中去处理
            checkout.setRabbitMqType(2);
            RabbitPublishUtils.sendDirectMessage(RabbitMQExchanges.EXCHANGE_A, RabbitMQRoutingKeys.ROUTINGKEY_A, checkout);
            if (glock.tryLock(1000, TimeUnit.MILLISECONDS)) {
            	//缓存库存恢复
                RAtomicLong count = redission.getAtomicLong(SECKILL_REMAIN_COUNT + kid);
                count.getAndIncrement();
				//购买成功列表移除当前用户
                Set<Long> newUsers = new HashSet<>();
                if(succedUsers.isExists()){
                    newUsers = succedUsers.get();
                    newUsers.remove(checkout.getAccountId());
                }
                succedUsers.set(newUsers);
				//释放写锁
                glock.unlock();
            }
            return R.failure("支付失败");
        }

        //下单状态修改 改为已付款
        checkout.setStatus(1);
        ordersDao.updateById(checkout);

        return R.success();
    }

接收通道消息

@Component
public class RabbitMqReceiver {

    @Autowired
    SeckillOrdersDao seckillCheckoutDao;

    @Autowired
    SeckillDao seckillDao;

    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "QUEUE_A", durable = "true", ignoreDeclarationExceptions = "true"),
            exchange = @Exchange(value = "my-mq-exchange_A"), key = "spring-boot-routingKey_A", ignoreDeclarationExceptions = "true"))
    public void handleMessage(SeckillOrders checkout){
        if(null != checkout){
            switch (checkout.getRabbitMqType()){
                case 0://生成订单以及其他业务逻辑
                    Seckill seckill = seckillDao.selectById(checkout.getSeckillId());
                    BigDecimal price = seckill.getSeckillPrice();
                    BigDecimal payment = price.multiply(new BigDecimal(checkout.getCount().toString()));
                    checkout.setPaymentAmount(payment);
                    checkout.setCheckoutTime(new Date());
                    //下单信息落库
                    seckillCheckoutDao.insert(checkout);
                    break;
                case 1://扣减库存以及其他业务逻辑
                    UpdateWrapper<Seckill> updateWapper = new UpdateWrapper<>();
                    updateWapper.eq("id",checkout.getSeckillId());
                    updateWapper.setSql("count = count - 1 ");
                    seckillDao.update(null,updateWapper);
                    break;
                case 2://恢复库存以及其它业务逻辑
                    updateWapper = new UpdateWrapper<>();
                    updateWapper.eq("id",checkout.getSeckillId());
                    updateWapper.setSql("count = count + 1 ");
                    seckillDao.update(null,updateWapper);
                    break;
                default:
                    break;
            }
        }
    }
}

完整代码

完整代码可以参考
秒杀功能Demo

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

“Sentinel + Redis + Mysql + RabbitMQ 秒杀功能设计及后端代码实现” 的相关文章