【Java项目推荐】值得写到简历上的项目--黑马点评

前言

项目是b站黑马程序员的redis教程中的案例建议所有java程序员做一下

这篇博客会从最简单的实现优惠卷秒杀到加分布式锁、对秒杀优化、使用消息队列异步下单做详细介绍

优惠券秒杀

实现优惠券秒杀下单

image-20221224200421157

@Override
@Transactional
public Result seckillVoucher(Long voucherId) {
    /**
     * 秒杀基本实现一
     *  1.查询优惠卷
     *  2.判断秒杀是否开始
     *  3.判断是否结束
     *  4.判断库存是否充足
     *  5.扣减库存
     *  6.创建订单
     */
    SeckillVoucher voucher = seckillVoucherService.getById(voucherId);

    if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
        //尚未开始
        return Result.fail("秒杀尚未开始!");
    }

    if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
        return Result.fail("秒杀已经结束!");
    }

    if (voucher.getStock() < 1) {
        return Result.fail("库存不足");
    }

    boolean success = seckillVoucherService.update()
            .setSql("stock = stock - 1")
            .eq("voucher_id", voucherId).update();

    if (!success) {
        return Result.fail("库存不足");
    }

    VoucherOrder voucherOrder = new VoucherOrder();
    long orderId = redisIdWorker.nextId("order");
    voucherOrder.setId(orderId);

    Long userId = UserHolder.getUser().getId();
    voucherOrder.setUserId(userId);

    voucherOrder.setVoucherId(voucherId);
    save(voucherOrder);

    return Result.ok(orderId);
}

超卖问题

超卖情况

image-20221105225642655

image-20221227204752096

加锁解决超卖问题

image-20221105230150840

乐观锁

两种方式是心爱乐观锁

  • 版本号法
  • CAS法Compare And swap

实现方式:

每当数据做一次修改,版本号加1,所以判断一个数据有没有被修改过就看它的版本有没有变化过

image-20221105231012183

CAS法:

Compare and Swap即比较再交换。

也就是我不在判断库存有没有被修改过了我每次都去比较看库存是否小于0

image-20221105231124576

乐观锁解决超卖问题

乐观锁更新操作的时候使用

@Override
@Transactional
public Result seckillVoucher(Long voucherId) {  
	/**
     * 秒杀基本实现二
     *  1.查询优惠卷
     *  2.判断秒杀是否开始
     *  3.判断是否结束
     *  4.判断库存是否充足
     *  5.扣减库存乐观锁解决超卖问题
     *  6.创建订单
     */
    SeckillVoucher voucher = seckillVoucherService.getById(voucherId);

    if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
        //尚未开始
        return Result.fail("秒杀尚未开始!");
    }

    if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
        return Result.fail("秒杀已经结束!");
    }

    if (voucher.getStock() < 1) {
        return Result.fail("库存不足");
    }

    boolean success = seckillVoucherService.update()
            .setSql("stock = stock - 1")
            .eq("voucher_id", voucherId).gt("stock",0)
            .update();

    if (!success) {
        return Result.fail("库存不足");
    }

    VoucherOrder voucherOrder = new VoucherOrder();
    long orderId = redisIdWorker.nextId("order");
    voucherOrder.setId(orderId);

    Long userId = UserHolder.getUser().getId();
    voucherOrder.setUserId(userId);

    voucherOrder.setVoucherId(voucherId);
    save(voucherOrder);

    return Result.ok(orderId);
}

一人一单

重复下单情况

image-20221105194348762

解决思路

 		@Override
    public Result seckillVoucher(Long voucherId) {
      /**
       * 秒杀基本实现三
       * 悲观锁实现一人一单
       */
        SeckillVoucher voucher = seckillVoucherService.getById(voucherId);

        if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
            //尚未开始
            return Result.fail("秒杀尚未开始!");
        }

        if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
            return Result.fail("秒杀已经结束!");
        }

        if (voucher.getStock() < 1) {
            return Result.fail("库存不足");
        }

        Long userId = UserHolder.getUser().getId();
        synchronized (userId.toString().intern()){
            //获取事务代理对象
            IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy();
            return proxy.createVoucherOrder(voucherId);
        }
		}

    @Transactional
    public Result createVoucherOrder(Long voucherId) {
        Long userId = UserHolder.getUser().getId();

        int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
        if (count > 0) {
            return Result.fail("您已经购买过一次了");
        }

        boolean success = seckillVoucherService.update()
                .setSql("stock = stock - 1")
                .eq("voucher_id", voucherId).gt("stock", 0)
                .update();

        if (!success) {
            return Result.fail("库存不足");
        }

        VoucherOrder voucherOrder = new VoucherOrder();
        long orderId = redisIdWorker.nextId("order");
        voucherOrder.setId(orderId);

        voucherOrder.setUserId(userId);
        voucherOrder.setVoucherId(voucherId);
        save(voucherOrder);

        return Result.ok(orderId);

    }

防止事务失效

synchronized (userId.toString().intern()){
  //获取事务代理对象
  IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy();
  return proxy.createVoucherOrder(voucherId);
}

事务的生效其实是Spring拿到当前对象的代理对象这里如果直接调用就不是Spring的代理对象了事务就会失效所以要获取当前对象的代理对象

通过AopContext.currentProxy()API去获取

为什么把生成订单单独提取出来

因为只有生成订单才会对数据库有插入操作这个时候才需要事务。

需要事务的同时一人一单也需要加锁悲观锁

而且要在事务提交之后在释放锁

测试

image-20230107150811523

分布式锁

集群下的线程并发安全问题

加锁的原理就是在JVM内部维护了一个锁监视器如果是集群模式下的话那就是多个JVM悲观锁就失效了

image-20230107151108120

通过加锁可以解决在单机情况下的一人一单安全问题但是在集群模式下就不行了。

模拟集群环境

  • 在开一个服务端口是8082通过在Vm option里添加-Dserver.port=8082
  • nginx做负载均衡

image-20221227224135074

image-20221227224333180

此时测试还是会发现会出现一人多单的情况

什么是分布式锁

满足分布式系统下或者集群模式下的多线程可见并且互斥的锁

image-20230107153319260

分布式锁的实现方式

image-20230107153820308

redis分布式锁测试

# 实现分布式锁时需要实现的两个基本方法

# 1.添加锁利用setnx的互斥特性
SETNX lock thread1
# 2.添加锁过期时间避免服务宕机引起的死锁
EXPIRE lock 10

# 释放锁删除即可
127.0.0.1:6379> DEL lock
(integer) 1

redis中加锁的一些特殊情况

如果下订单的线程在redis中加了锁这时如果redis宕机了那么其他线程就会一直处于等待状态这时就出现了死锁的现象。

如何解决

利用redis中key过期时间自动释放锁能避免服务宕机引起的死锁

如果服务在加锁和过期释放期间宕机怎么办

保证加锁和过期释放的原子性

redis的set命令可以跟上很多参数可以同时保证加锁和设置过期时间

  • EX设置键的过期时间为 second 秒。 SET key value EX second 效果等同于 SETEX key second value
  • NX只在键不存在时才对键进行设置操作。 SET key value NX 效果等同于 SETNX key value 。

因为 SET 命令可以通过参数来实现和 SETNXSETEXPSETEX 三个命令的效果所以将来的 Redis 版本可能会废弃并最终移除 SETNXSETEXPSETEX 这三个命令。

127.0.0.1:6379> SET lock thread1 NX EX 10
OK
127.0.0.1:6379> ttl lock
(integer) 5
127.0.0.1:6379> ttl lock
(integer) -2

手动实现分布式锁

实现分布式锁的流程

image-20230108120024394

impl

实现分布式锁的思路就是通过redis中的setnx如果不存在就创建key存在就不创建这样的互斥命令

每当有线程过来抢购的时候首先会获取锁也就是执行redis中的setnx命令。如果再有线程过来抢购那么就会被阻塞只有等该锁被释放其他线程才能再次获取

@Override
public Result seckillVoucher(Long voucherId) {
  SeckillVoucher voucher = seckillVoucherService.getById(voucherId);

  if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
    //尚未开始
    return Result.fail("秒杀尚未开始!");
  }

  if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
    return Result.fail("秒杀已经结束!");
  }

  if (voucher.getStock() < 1) {
    return Result.fail("库存不足");
  }

  Long userId = UserHolder.getUser().getId();

  //生成分布式锁对象传入当前用户id和stringRedisTemplate对象
  SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);

  //
  boolean isLock = lock.tryLock(1200);
  if (!isLock){
    //            获取锁失败返回错误或重试
    return Result.fail("不允许重复下单");
  }

  try {
    //获取事务代理对象
    IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
    return proxy.createVoucherOrder(voucherId);
  } finally {
    lock.unlock();
  }
}

public class SimpleRedisLock implements ILock {
    //不同的业务有不同的名称
    private String name;
    private StringRedisTemplate stringRedisTemplate;
    //对锁定义一个统一的前缀
    private static final String KEY_PREFIX = "lock:";

    //锁的名称要求用户传递给我们所以这里我们定义一个构造函数
    public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
        this.name = name;
        this.stringRedisTemplate = stringRedisTemplate;
    }

    @Override
    public boolean tryLock(long timeoutSec) {
        /**
         * 版本一
         * 基础实现
         * key就是固定前缀+锁的名称value就是线程标识
         * SET lock thread1 NX EX 10
         */
        String threadId = Thread.currentThread().getId();
        Boolean success = stringRedisTemplate.opsForValue()
                .setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
        return Boolean.TRUE.equals(success);
    }

    @Override
    public void unlock() {
        stringRedisTemplate.delete(ID_PREFIX + name);
    }

分布式锁误删情况1

分布式锁误删情况说明

获取锁之后线程A的业务出现了阻塞直到锁到了超时时间被自动释放业务还在处于阻塞状态。

这时线程B获取锁开始执行自己的业务此时线程A阻塞的业务完成后会把锁给删掉这样就是分布式锁误删的情况。

所以我们在删除锁的时候需要进行一个判断看看删除的是不是当前线程所持有的锁

image-20230108202201918

@Override
public Result seckillVoucher(Long voucherId) {
  SeckillVoucher voucher = seckillVoucherService.getById(voucherId);

  if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
    //尚未开始
    return Result.fail("秒杀尚未开始!");
  }

  if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
    return Result.fail("秒杀已经结束!");
  }

  if (voucher.getStock() < 1) {
    return Result.fail("库存不足");
  }

  Long userId = UserHolder.getUser().getId();

  //生成分布式锁对象传入当前用户id和stringRedisTemplate对象
  SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);

  //
  boolean isLock = lock.tryLock(1200);
  if (!isLock){
    //            获取锁失败返回错误或重试
    return Result.fail("不允许重复下单");
  }

  try {
    //获取事务代理对象
    IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
    return proxy.createVoucherOrder(voucherId);
  } finally {
    lock.unlock();
  }
}

public class SimpleRedisLock implements ILock {
    //不同的业务有不同的名称
    private String name;
    private StringRedisTemplate stringRedisTemplate;
    //对锁定义一个统一的前缀
    private static final String KEY_PREFIX = "lock:";

    //锁的名称要求用户传递给我们所以这里我们定义一个构造函数
    public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
        this.name = name;
        this.stringRedisTemplate = stringRedisTemplate;
    }
  
    private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";

    @Override
    public boolean tryLock(long timeoutSec) {
        /**
         * 版本一
         * 基础实现
         * key就是固定前缀+锁的名称value就是线程标识
         * SET lock thread1 NX EX 10
         */
        String threadId = ID_PREFIX + Thread.currentThread().getId();
        Boolean success = stringRedisTemplate.opsForValue()
                .setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
        return Boolean.TRUE.equals(success);
    }

    @Override
    public void unlock() {
         /**
         * 版本二
         * 释放锁的时候判断是不是当前线程的锁
         */
        //获取线程id
        String threadId = ID_PREFIX + Thread.currentThread().getId();
        //获取key
        String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
        if (threadId.equals(id)) {
            stringRedisTemplate.delete(KEY_PREFIX + name);
        }
    }

分布式锁误删情况2

情况说明

如果JVM发送FULL GC时会阻塞所有的代码因为判断标识是否一致和释放锁是两步所以在判断成功之后如果发生FUll GC那么其他线程再次获取锁的时候还是可能发生误删的情况

image-20230108202723108

为了避免这种情况的发生我们必须保证判断锁标识的动作和释放锁的动作是原子性的

这就是下面我们要学习的lua脚本

lua脚本解决多条命令的原子性问题

什么是lua脚本

Redis提供了Lua脚本功能在一个脚本中编写多条Redis命令确保多条命令执行时的原子性。Lua是一种编程语言它的基本语法大家可以参考网站https://www.runoob.com/lua/lua-tutorial.html

lua脚本的基本使用

在lua脚本中调用函数如下

redis.call('命令名称', 'key', '其它参数', ...)

例如我们要执行set name jack则脚本是这样

# 执行 set name jack
redis.call('set', 'name', 'jack')

例如我们要先执行set name Rose再执行get name则脚本如下

# 先执行 set name jack
redis.call('set', 'name', 'jack')
# 再执行 get name
local name = redis.call('get', 'name')
# 返回
return name

写好脚本以后需要用Redis命令来调用脚本调用脚本的常见命令如下

127.0.0.1:6379> help @scripting

  EVAL script numkeys [key [key ...]] [arg [arg ...]]
  summary: Execute a Lua script server side
  since: 2.6.0

例如我们要执行 redis.call(‘set’, ‘name’, ‘jack’) 这个脚本语法如下

127.0.0.1:6379> EVAL "return redis.call('set','name','Jack')" 0

如果脚本中的key、value不想写死可以作为参数传递。key类型参数会放入KEYS数组其它参数会放入ARGV数组在脚本中可以从KEYS和ARGV数组获取这些参数

127.0.0.1:6379> EVAL "return redis.call('set',KEYS[1],ARGV[1])" 1 name Tom
OK
127.0.0.1:6379> get name
"Tom"

Java调用lua脚本改造分布式锁

释放锁的业务流程是这样的

  • 获取锁中的线程标示
  • 判断是否与指定的标示当前线程标示一致
  • 如果一致则释放锁删除
  • 如果不一致则什么都不做
  • 如果用Lua脚本来表示则是这样的
---
--- Generated by EmmyLua(https://github.com/EmmyLua)
--- Created by qiang.
--- DateTime: 2023/1/8 21:24
---

if (redis.call('get', KEYS[1]) == ARGV[1]) then
    --释放锁
    return redis.call('del', KEYS[1])
end
--如果不匹配
return 0

RedisTemplate调用Lua脚本的API如下

image-20230108213936297

@Override
public Result seckillVoucher(Long voucherId) {
  SeckillVoucher voucher = seckillVoucherService.getById(voucherId);

  if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
    //尚未开始
    return Result.fail("秒杀尚未开始!");
  }

  if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
    return Result.fail("秒杀已经结束!");
  }

  if (voucher.getStock() < 1) {
    return Result.fail("库存不足");
  }

  Long userId = UserHolder.getUser().getId();

  //生成分布式锁对象传入当前用户id和stringRedisTemplate对象
  SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);

  //
  boolean isLock = lock.tryLock(1200);
  if (!isLock){
    //            获取锁失败返回错误或重试
    return Result.fail("不允许重复下单");
  }

  try {
    //获取事务代理对象
    IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
    return proxy.createVoucherOrder(voucherId);
  } finally {
    lock.unlock();
  }
}

public class SimpleRedisLock implements ILock {
  //不同的业务有不同的名称
  private String name;
  private StringRedisTemplate stringRedisTemplate;
  //对锁定义一个统一的前缀
  private static final String KEY_PREFIX = "lock:";

  //锁的名称要求用户传递给我们所以这里我们定义一个构造函数
  public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
    this.name = name;
    this.stringRedisTemplate = stringRedisTemplate;
  }

  private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
  
  private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;

  static {
    UNLOCK_SCRIPT = new DefaultRedisScript<>();
    UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
    UNLOCK_SCRIPT.setResultType(Long.class);
  }

  @Override
  public boolean tryLock(long timeoutSec) {
    /**
         * 版本一
         * 基础实现
         * key就是固定前缀+锁的名称value就是线程标识
         * SET lock thread1 NX EX 10
         */
    String threadId = ID_PREFIX + Thread.currentThread().getId();
    Boolean success = stringRedisTemplate.opsForValue()
      .setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
    return Boolean.TRUE.equals(success);
  }

  @Override
  public void unlock() {
        /**
         * 版本三
         * 通过lua脚本来释放锁
         */
        stringRedisTemplate.execute(
                UNLOCK_SCRIPT,
                Collections.singletonList(KEY_PREFIX + name),
                ID_PREFIX + Thread.currentThread().getId());
    }
  }
}

Redisson

setnx实现的分布式锁存在的问题

image-20230108221629597

什么是Redisson

Redisson基于redis实现了一套分布式工具的集合

image-20230108221802187

官网地址 https://redisson.org
GitHub地址 https://github.com/redisson/redisson

Redisson快速入门

一、引入依赖

<dependency>
  <groupId>org.redisson</groupId>
  <artifactId>redisson</artifactId>
  <version>3.11.1</version>
</dependency>

二、配置Redisson客户端

@Configuration
public class  RedissonConfig {

    @Bean
    public RedissonClient redissonClient(){
        // 配置
        Config config = new Config();
        config.useSingleServer().setAddress("redis://172.20.10.2:6379");
        // 创建RedissonClient对象
        return Redisson.create(config);
    }
}

三、使用Redisson的分布式锁

@Override
public Result seckillVoucher(Long voucherId) {
  SeckillVoucher voucher = seckillVoucherService.getById(voucherId);

  if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
    //尚未开始
    return Result.fail("秒杀尚未开始!");
  }

  if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
    return Result.fail("秒杀已经结束!");
  }

  if (voucher.getStock() < 1) {
    return Result.fail("库存不足");
  }

  Long userId = UserHolder.getUser().getId();

  RLock lock = redissonClient.getLock("lock:order:" + userId);

  /**
         * tryLock参数说明
         * long waitTime 超时等待时间  默认是-1也就是不等待获取不到就直接返回false
         * long leaseTime 超时释放时间 默认是30s如果该锁超过30s会自动释放
         */
  boolean isLock = lock.tryLock();
  if (!isLock) {
    //            获取锁失败返回错误或重试
    return Result.fail("不允许重复下单");
  }

  try {
    //获取事务代理对象
    IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
    return proxy.createVoucherOrder(voucherId);
  } finally {
    lock.unlock();
  }
}

秒杀优化

异步秒杀思路

将原先逻辑的串行执行改为异步执行也就是将判断用户有没有秒杀资格交给redis去做如果有那就把用户信息订单信息保存到阻塞队列中交给其他线程去执行

image-20230109151329636

基于redis完成秒杀资格判断

redis数据结构的选择

判断库存是否充足可以选用String结构key是优惠卷的信息value是库存的数量

判断一人一单可以选用set结构将下过单的用户保存到set集合当中

流程如下

image-20230109152002579

实现

1.新增秒杀优惠券的同时将优惠券信息保存到Redis中

@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
		...
  	...
    stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
}

2.基于Lua脚本判断秒杀库存、一人一单决定用户是否抢购成功

---
--- Generated by EmmyLua(https://github.com/EmmyLua)
--- Created by qiang.
--- DateTime: 2023/1/9 14:41
---
-- 1.参数列表
--判断库存是否充足需要去redis中去查所以需要知道优惠卷id
-- 1.1优惠卷id
local voucherId = ARGV[1]

--判断一人一单需要知道用户id
-- 1.2用户id
local userId = ARGV[2]

-- 1.3.订单id
--local orderId = ARGV[3]

-- 2.数据库key
-- 2.1库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2订单key
local orderKey = 'seckill:order:' .. voucherId

-- 3.脚本业务
-- 3.1判断库存是否大于0 get stock
if (tonumber(redis.call('get', stockKey)) <= 0) then
    -- 3.2.库存不足返回1
    return 1
end


-- 判断用户是否下单 SISMEMBER orderKey userId
--127.0.0.1:6379> sadd setCollection1 1 2 3
--(integer) 3
--127.0.0.1:6379> SISMEMBER setCollection1 2
--(integer) 1
--127.0.0.1:6379> SISMEMBER setCollection1 0
--(integer) 0

if (redis.call('SISMEMBER', orderKey, userId) == 1) then
    --存在说明是重复下单
    return 2
end

-- 3.4扣库存 incrby stockKey - 1
redis.call('incrby', stockKey, -1)
-- 3.5下单
redis.call('sadd', orderKey, userId)
return 0

3.java代码如下

PS:秒杀卷必须已经被保存到redis当中

因为lua脚本在执行的时候会去redis中读取优惠卷的库存不然会出现如下错误

user_script:26: attempt to compare nil with number

@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

    @Resource
    private ISeckillVoucherService seckillVoucherService;

    @Resource
    private RedisIdWorker redisIdWorker;

    @Resource
    private RedissonClient redissonClient;

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    //加载seckill.lua文件
    private static final DefaultRedisScript<Long> SECKILL_SCRIPT;

    static {
        SECKILL_SCRIPT = new DefaultRedisScript<>();
        SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
        SECKILL_SCRIPT.setResultType(Long.class);
    }

    @Override
    public Result seckillVoucher(Long voucherId) {
       /**
         * 秒杀实现六
         * 通过redis的lua脚本对秒杀进行优化
         */
        Long userId = UserHolder.getUser().getId();

//        执行lua脚本
        Long result = stringRedisTemplate.execute(
                SECKILL_SCRIPT,
                Collections.emptyList(),
                voucherId.toString(), userId.toString()
        );
//        判断结果是否为0
        int r = result.intValue();

        if (r != 0){
//            不为0代表没有购买资格
            return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
        }
//        为0有购买资格把下单信息保存到阻塞队列
        long orderId = redisIdWorker.nextId("order");

//        返回订单id
        return Result.ok(orderId);
    }
}

基于阻塞队列实现异步下单

防止事务失效

事务的生效其实是Spring拿到当前对象的代理对象这里如果直接调用直接创建订单就不是Spring的代理对象了事务就会失效所以要获取当前对象的代理对象通过AopContext.currentProxy()API去获取

@Slf4j
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

  @Resource
  private ISeckillVoucherService seckillVoucherService;

  @Resource
  private RedisIdWorker redisIdWorker;

  @Resource
  private RedissonClient redissonClient;

  @Resource
  private StringRedisTemplate stringRedisTemplate;

  //加载seckill.lua文件
  private static final DefaultRedisScript<Long> SECKILL_SCRIPT;

  static {
    SECKILL_SCRIPT = new DefaultRedisScript<>();
    SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
    SECKILL_SCRIPT.setResultType(Long.class);
  }

  //阻塞队列
  private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);
  //线程池
  private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

  //在类初始化之后就执行init方法
  //init方法会去执行创建订单线程VoucherOrderHandler新的线程通过实现Runnable接口创建
  @PostConstruct
  private void init(){
    SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
  }

  //VoucherOrderHandler为线程任务
  //该线程会去读阻塞队列中的订单信息然后再去调用创建订单方法完成异步下单
  private class VoucherOrderHandler implements Runnable{
    @Override
    public void run() {
      while (true){
        try {
          //                获取队列中的订单信息
          VoucherOrder voucherOrder = orderTasks.take();
          //                    创建订单
          handleVoucherOrder(voucherOrder);
        } catch (Exception e) {
          log.error("处理订单异常",e);
        }
      }
    }
  }

  /**
     * 异步创建订单
     */
  private void handleVoucherOrder(VoucherOrder voucherOrder) {
    //这里创建订单的线程不是主线程所以不能从userHolder里获取用户只能从订单对象中获取用户id
    Long userId = voucherOrder.getUserId();
    RLock lock = redissonClient.getLock("lock:order:" + userId);

    /**
         * tryLock参数说明
         * long waitTime 超时等待时间  默认是-1也就是不等待获取不到就直接返回false
         * long leaseTime 超时释放时间 默认是30s如果该锁超过30s会自动释放
         */
    boolean isLock = lock.tryLock();
    if (!isLock) {
      //兜底方案其实不用再去获取锁因为在lua脚本中已经判断过一人一单
      log.error("不允许重复下单");
      return;
    }

    try {
      //获取事务代理对象
      //这里不能通过AopContext.currentProxy()去获取代理对象因为创建优惠卷订单createVoucherOrder是在主线程执行的
      //而当前方法是新的线程执行的代码我们必须用主线程才能防止创建优惠卷订单createVoucherOrder事务失效
      proxy.createVoucherOrder(voucherOrder);
    } finally {
      lock.unlock();
    }
  }

  IVoucherOrderService proxy;
  @Override
  public Result seckillVoucher(Long voucherId) {
    /**
         * 秒杀实现六
         * 通过redis的lua脚本对秒杀进行优化
         */
    Long userId = UserHolder.getUser().getId();

    //      1.执行lua脚本
    Long result = stringRedisTemplate.execute(
      SECKILL_SCRIPT,
      Collections.emptyList(),
      voucherId.toString(), userId.toString()
    );
    //       2 判断结果是否为0
    int r = result.intValue();

    if (r != 0) {
      //            2.1 不为0代表没有购买资格
      return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
    }
    //        2.2 为0有购买资格把下单信息保存到阻塞队列
    VoucherOrder voucherOrder = new VoucherOrder();
    //        2.3 订单id
    long orderId = redisIdWorker.nextId("order");
    voucherOrder.setId(orderId);
    //        2.4 用户id
    voucherOrder.setUserId(userId);
    //        2.5 代金劵id
    voucherOrder.setVoucherId(voucherId);
    //        2.6 放入阻塞队列
    orderTasks.add(voucherOrder);

    //        返回订单id
    return Result.ok(orderId);
  }


  @Transactional
  public void createVoucherOrder(VoucherOrder voucherOrder) {
    Long userId = voucherOrder.getUserId();

    int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder).count();
    if (count > 0) {
      log.error("您已经购买过一次了");
      return;
    }

    boolean success = seckillVoucherService.update()
      .setSql("stock = stock - 1")
      .eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0)
      .update();

    if (!success) {
      log.error("库存不足");
      return;
    }

    save(voucherOrder);
  }
}

总结

  • 新增秒杀优惠券的同时将优惠券信息保存到Redis中
  • 基于Lua脚本判断秒杀库存、一人一单决定用户是否抢购成功
  • 如果抢购成功将优惠券id和用户id封装后存入阻塞队列
IVoucherOrderService proxy;
@Override
public Result seckillVoucher(Long voucherId) {
  /**
         * 秒杀实现六
         * 通过redis的lua脚本对秒杀进行优化
         */
  Long userId = UserHolder.getUser().getId();

  //      1.执行lua脚本
  Long result = stringRedisTemplate.execute(
    SECKILL_SCRIPT,
    Collections.emptyList(),
    voucherId.toString(), userId.toString()
  );
  //       2 判断结果是否为0
  int r = result.intValue();

  if (r != 0) {
    //            2.1 不为0代表没有购买资格
    return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
  }
  //        2.2 为0有购买资格把下单信息保存到阻塞队列
  VoucherOrder voucherOrder = new VoucherOrder();
  //        2.3 订单id
  long orderId = redisIdWorker.nextId("order");
  voucherOrder.setId(orderId);
  //        2.4 用户id
  voucherOrder.setUserId(userId);
  //        2.5 代金劵id
  voucherOrder.setVoucherId(voucherId);
  //        2.6 放入阻塞队列
  orderTasks.add(voucherOrder);
  //        返回订单id
  return Result.ok(orderId);
}
  • 开启线程任务不断从阻塞队列中获取信息实现异步下单功能
//加载seckill.lua文件
  private static final DefaultRedisScript<Long> SECKILL_SCRIPT;

  static {
    SECKILL_SCRIPT = new DefaultRedisScript<>();
    SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
    SECKILL_SCRIPT.setResultType(Long.class);
  }

  //阻塞队列
  private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);
  //线程池
  private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

  //在类初始化之后就执行init方法
  //init方法会去执行创建订单线程VoucherOrderHandler新的线程通过实现Runnable接口创建
  @PostConstruct
  private void init(){
    SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
  }

  //VoucherOrderHandler为线程任务
  //该线程会去读阻塞队列中的订单信息然后再去调用创建订单方法完成异步下单
  private class VoucherOrderHandler implements Runnable{
    @Override
    public void run() {
      while (true){
        try {
          //                获取队列中的订单信息
          VoucherOrder voucherOrder = orderTasks.take();
          //                    创建订单
          handleVoucherOrder(voucherOrder);
        } catch (Exception e) {
          log.error("处理订单异常",e);
        }
      }
    }
  }

  /**
     * 异步创建订单
     */
  private void handleVoucherOrder(VoucherOrder voucherOrder) {
    //这里创建订单的线程不是主线程所以不能从userHolder里获取用户只能从订单对象中获取用户id
    Long userId = voucherOrder.getUserId();
    RLock lock = redissonClient.getLock("lock:order:" + userId);

    /**
         * tryLock参数说明
         * long waitTime 超时等待时间  默认是-1也就是不等待获取不到就直接返回false
         * long leaseTime 超时释放时间 默认是30s如果该锁超过30s会自动释放
         */
    boolean isLock = lock.tryLock();
    if (!isLock) {
      //兜底方案其实不用再去获取锁因为在lua脚本中已经判断过一人一单
      log.error("不允许重复下单");
      return;
    }

    try {
      //获取事务代理对象
      //这里不能通过AopContext.currentProxy()去获取代理对象因为创建优惠卷订单createVoucherOrder是在主线程执行的
      //而当前方法是新的线程执行的代码我们必须用主线程才能防止创建优惠卷订单createVoucherOrder事务失效
      proxy.createVoucherOrder(voucherOrder);
    } finally {
      lock.unlock();
    }
  }

Redis消息队列

把订单信息放到阻塞队列的缺点是什么

  • 无法持久化
  • 受JVM内存大小影响

那为什么消息队列能解决这些问题

  • 消息队列属于JVM以外的独立服务不受JVM内存的限制
  • 消息队列里的消息可以持久化并且消息队列会保证消息至少被消费一次

现在有哪些消息队列可以用呢

Kafka、RabbitMQ、RocketMQ…

但是我们都不用这些因为我们项目规模比较小我们用reids去实现消息队列就可以胜任

Redis提供了三种不同的方式来实现消息队列

  • list结构基于List结构模拟消息队列
  • PubSub基本的点对点消息模型
  • Stream比较完善的消息队列模型

基于List实现消息队列

# lpush命令在l1队列中放入两个元素
127.0.0.1:6379> LPUSH l1 element1 element2
(integer) 2

# brpop命令在l1队列右侧取出第一个元素等待时间20s
127.0.0.1:6379> BRPOP l1 20
1) "l1"
2) "element1"

# brpop命令在l1队列右侧取出第二个元素等待时间20s
127.0.0.1:6379> BRPOP l1 20
1) "l1"
2) "element2"

# brpop命令在l1队列右侧取出第三个元素等待时间20s元素被取完命令处于阻塞状态
127.0.0.1:6379> BRPOP l1 20

基于List的消息队列有哪些优缺点

优点

  • 利用Redis存储不受限于JVM内存上限
  • 基于Redis的持久化机制数据安全性有保证
  • 可以满足消息有序性

缺点

  • 无法避免消息丢失
  • 只支持单消费者

PubSub实现消息队列

# 向频道发送消息
127.0.0.1:6379> PUBLISH order.q1 hello
(integer) 2
127.0.0.1:6379> PUBLISH order.q2 hello
(integer) 1

# 订阅一个或多个频道
127.0.0.1:6379> SUBSCRIBE order.q1
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "order.q1"
3) (integer) 1
1) "message"
2) "order.q1"
3) "hello"

# 订阅与pattern格式匹配的所有频道
127.0.0.1:6379> PSUBSCRIBE order.*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "order.*"
3) (integer) 1
1) "pmessage"
2) "order.*"
3) "order.q1"
4) "hello"
1) "pmessage"
2) "order.*"
3) "order.q2"
4) "hello"

基于PubSub的消息队列有哪些优缺点

优点

  • 采用发布订阅模型支持多生产、多消费

缺点

  • 不支持数据持久化向频道发送消息如果没人接受那么消息会被直接丢掉
  • 无法避免消息丢失
  • 消息堆积有上限超出时数据丢失

Stream消息队列

单消费模式

Stream 是 Redis 5.0 引入的一种新数据类型可以实现一个功能非常完善的消息队列。

发送消息

image-20230110155934302

读取消息的方式之一XREAD

如果不指定阻塞时长

image-20230110160155190

测试
# 写消息
127.0.0.1:6379> XADD users * name zs age 12
"1673337915109-0"

# 读消息
127.0.0.1:6379> XREAD count 1 streams user 0
(nil)
127.0.0.1:6379> XREAD count 1 streams users 0
1) 1) "users"
   2) 1) 1) "1673337915109-0"
         2) 1) "name"
            2) "zs"
            3) "age"
            4) "12"
            
# 消息可重复读
127.0.0.1:6379> XREAD count 1 streams users 0
1) 1) "users"
   2) 1) 1) "1673337915109-0"
         2) 1) "name"
            2) "zs"
            3) "age"
            4) "12"
            
            
# 再次发送消息
127.0.0.1:6379> XADD users * name ls age 21
"1673338159927-0"

# 阻塞等待消息            
127.0.0.1:6379> XREAD COUNT 1 BLOCK 0 STREAMS users $
1) 1) "users"
   2) 1) 1) "1673338159927-0"
         2) 1) "name"
            2) "ls"
            3) "age"
            4) "21"
(13.00s)

消息漏读情况

当我们指定起始ID为$时代表读取最新的消息如果我们处理一条消息的过程中又有超过1条以上的消息到达队列则下次获取时也只能获取到最新的一条会出现漏读消息的问题

image-20230110161449209

消费组模式

image-20230110164956512

创建消费者组

image-20230110165317457

从消费者组读取消息

image-20230110165440794

测试

创建消费者组

127.0.0.1:6379> XGROUP create s1 g1 0
OK

读取消费者组

127.0.0.1:6379> XREADGROUP group g1 c1 count 1 block 2000 streams s1 >

1) 1) "s1"
   2) 1) 1) "1673336869091-0"
         2) 1) "k1"
            2) "v1"

在s1队列中添加多条消息

127.0.0.1:6379> XADD s1 * k2 v2
"1673354707296-0"
127.0.0.1:6379> XADD s1 * k3 v3
"1673354711159-0"

读取消费者组中的数据

  • 通过>号获取没有被确认的消息
127.0.0.1:6379> XREADGROUP group g1 c1 count 1 block 2000 streams s1 >

1) 1) "s1"
   2) 1) 1) "1673354707296-0"
         2) 1) "k2"
            2) "v2"

读取消费者组中的数据

  • 通过>号获取没有被确认的消息
127.0.0.1:6379> XREADGROUP group g1 c1 count 1 block 2000 streams s1 >

1) 1) "s1"
   2) 1) 1) "1673354711159-0"
         2) 1) "k3"
            2) "v3"

查看消息队列中那一条消息没有被处理

  • -,+号代表id范围表示所有id
  • 10代表获取消息的数量
127.0.0.1:6379> XPENDING s1 g1 - + 10

1) 1) "1673336869091-0"
   2) "c1"
   3) (integer) 1113736
   4) (integer) 1

读取pending-list中没有被确认的消息

  • 0,代表读取pending-list中第一条没有被确认的消息
127.0.0.1:6379> XREADGROUP group g1 c1 count 1 block 2000 streams s1 0

1) 1) "s1"
   2) 1) 1) "1673336869091-0"
         2) 1) "k1"
            2) "v1"

确认消息

127.0.0.1:6379> XACK s1 g1 1673336869091-0
(integer) 1

再次查看pending-list中没有被处理的消息

127.0.0.1:6379> XREADGROUP group g1 c1 count 1 block 2000 streams s1 0

1) 1) "s1"
   2) (empty array)   
impl

stream消息队列业务流程处理思路

  • 1.初始化stream消息队列如果没有的话就去创建一个如果stream存在判断group消费者组是否存在
  • 2.加载seckill.lua脚本去做秒杀资格和一人一单判断最后把订单相关信息保存到redis的消息队列中
  • 3.开始读取stream消息队列中的消息然后进行解析从而获得订单信息进而把订单信息异步保存到数据库中
  • 4.进行消息队列的确认
  • 5.如果遇见没有处理的消息会被捕捉异常进入handlePendingList方法重新去pending-list中获取未处理的第一条消息同样会在数据库中被保存
/**
 * @author qiang
 * @since 2022-12-27
 */
@Slf4j
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

    @Resource
    private ISeckillVoucherService seckillVoucherService;

    @Resource
    private RedisIdWorker redisIdWorker;

    @Resource
    private RedissonClient redissonClient;

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    //加载seckill.lua文件
    private static final DefaultRedisScript<Long> SECKILL_SCRIPT;

    static {
        SECKILL_SCRIPT = new DefaultRedisScript<>();
        SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
        SECKILL_SCRIPT.setResultType(Long.class);
    }

    //线程池
    private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

    //在类初始化之后就执行init方法
    //init方法会去执行创建订单线程VoucherOrderHandler新的线程通过实现Runnable接口创建
    @PostConstruct
    private void init() {
        SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
    }


    //VoucherOrderHandler为线程任务
    //该线程会去读消息队列中的订单信息然后再去调用创建订单方法完成异步下单
    private class VoucherOrderHandler implements Runnable {
        private final String queueName = "stream.orders";

        @Override
        public void run() {
            while (true) {
                try {
                    // 0.初始化stream
                    initStream();
                    // 1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                            Consumer.from("g1", "c1"),
                            StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                            StreamOffset.create(queueName, ReadOffset.lastConsumed())
                    );
                    // 2.判断订单信息是否为空
                    if (list == null || list.isEmpty()) {
                        // 如果为null说明没有消息继续下一次循环
                        continue;
                    }
                    // 解析数据
                    MapRecord<String, Object, Object> record = list.get(0);
                    Map<Object, Object> value = record.getValue();
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
                    // 3.创建订单
                    handleVoucherOrder(voucherOrder);
                    // 4.确认消息 XACK stream.orders g1 id
                    stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());

                } catch (Exception e) {
                    log.error("处理订单异常", e);
                    handlePendingList();
                }
            }
        }

        public void initStream() {
            Boolean exists = stringRedisTemplate.hasKey(queueName);
            if (BooleanUtil.isFalse(exists)) {
                log.info("stream不存在开始创建stream");
                // 不存在需要创建
                stringRedisTemplate.opsForStream().createGroup(queueName, ReadOffset.latest(), "g1");
                log.info("stream和group创建完毕");
                return;
            }
            // stream存在判断group是否存在
            StreamInfo.XInfoGroups groups = stringRedisTemplate.opsForStream().groups(queueName);
            if (groups.isEmpty()) {
                log.info("group不存在开始创建group");
                // group不存在创建group
                stringRedisTemplate.opsForStream().createGroup(queueName, ReadOffset.latest(), "g1");
                log.info("group创建完毕");
            }
        }

        private void handlePendingList() {
            while (true) {
                try {
                    // 1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 STREAMS s1 0
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                            Consumer.from("g1", "c1"),
                            StreamReadOptions.empty().count(1),
                            StreamOffset.create(queueName, ReadOffset.from("0"))
                    );
                    // 2.判断订单信息是否为空
                    if (list == null || list.isEmpty()) {
                        // 如果为null说明没有消息继续下一次循环
                        break;
                    }
                    // 解析数据
                    MapRecord<String, Object, Object> record = list.get(0);
                    Map<Object, Object> value = record.getValue();
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
                    // 3.创建订单
                    handleVoucherOrder(voucherOrder);
                    // 4.确认消息 XACK stream.orders g1 id
                    stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
                } catch (Exception e) {
                    log.error("处理订单异常", e);
                }
            }
        }

    }

    /**
     * 异步创建订单
     */
    private void handleVoucherOrder(VoucherOrder voucherOrder) {
        //这里创建订单的线程不是主线程所以不能从userHolder里获取用户只能从订单对象中获取用户id
        Long userId = voucherOrder.getUserId();
        RLock lock = redissonClient.getLock("lock:order:" + userId);

        /**
         * tryLock参数说明
         * long waitTime 超时等待时间  默认是-1也就是不等待获取不到就直接返回false
         * long leaseTime 超时释放时间 默认是30s如果该锁超过30s会自动释放
         */
        boolean isLock = lock.tryLock();
        if (!isLock) {
            //兜底方案其实不用再去获取锁因为在lua脚本中已经判断过一人一单
            log.error("不允许重复下单");
            return;
        }

        try {
            //获取事务代理对象
            //这里不能通过AopContext.currentProxy()去获取代理对象因为创建优惠卷订单createVoucherOrder是在主线程执行的
            //而当前方法是新的线程执行的代码我们必须用主线程才能防止创建优惠卷订单createVoucherOrder事务失效
            proxy.createVoucherOrder(voucherOrder);
        } finally {
            lock.unlock();
        }
    }

    //通过主线程获取代理对象
    private IVoucherOrderService proxy;

    @Override
    public Result seckillVoucher(Long voucherId) {
        /**
         * 秒杀实现七
         * 使用redis提供的Stream消息队列优化秒杀
         */
        Long userId = UserHolder.getUser().getId();
        //获取订单id
        long orderId = redisIdWorker.nextId("order");


//      1.执行lua脚本
        Long result = stringRedisTemplate.execute(
                SECKILL_SCRIPT,
                Collections.emptyList(),
                voucherId.toString(), userId.toString(), String.valueOf(orderId)
        );
//       2 判断结果是否为0
        int r = result.intValue();

        if (r != 0) {
//            2.1 不为0代表没有购买资格
            return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
        }

        proxy = (IVoucherOrderService) AopContext.currentProxy();
//        返回订单id
        return Result.ok(orderId);
    }


    @Transactional
    public void createVoucherOrder(VoucherOrder voucherOrder) {
        Long userId = voucherOrder.getUserId();

        int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder).count();
        if (count > 0) {
            log.error("您已经购买过一次了");
            return;
        }

        boolean success = seckillVoucherService.update()
                .setSql("stock = stock - 1")
                .eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0)
                .update();

        if (!success) {
            log.error("库存不足");
            return;
        }

        save(voucherOrder);
    }
}
lua脚本
---
--- Generated by EmmyLua(https://github.com/EmmyLua)
--- Created by qiang.
--- DateTime: 2023/1/9 14:41
---
-- 1.参数列表
--判断库存是否充足需要去redis中去查所以需要知道优惠卷id
-- 1.1优惠卷id
local voucherId = ARGV[1]

--判断一人一单需要知道用户id
-- 1.2用户id
local userId = ARGV[2]

-- 1.3.订单id
local orderId = ARGV[3]

-- 2.数据库key
-- 2.1库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2订单key
local orderKey = 'seckill:order:' .. voucherId

-- 3.脚本业务
-- 3.1判断库存是否大于0 get stock
if (tonumber(redis.call('get', stockKey)) <= 0) then
    -- 3.2.库存不足返回1
    return 1
end


-- 判断用户是否下单 SISMEMBER orderKey userId
--127.0.0.1:6379> sadd setCollection1 1 2 3
--(integer) 3
--127.0.0.1:6379> SISMEMBER setCollection1 2
--(integer) 1
--127.0.0.1:6379> SISMEMBER setCollection1 0
--(integer) 0

if (redis.call('SISMEMBER', orderKey, userId) == 1) then
    --存在说明是重复下单
    return 2
end

-- 3.4扣库存 incrby stockKey - 1
redis.call('incrby', stockKey, -1)
-- 3.5下单
redis.call('sadd', orderKey, userId)
-- 3.6发送消息到队列中XADD stream.orders * k1 v1 k2 v2 ...
-- *代表消息的id自动生成orderId的key为id是因为订单实体类中订单id的字段属性为id
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0

总结

STREAM类型消息队列的XREADGROUP命令特点

  • 消息可回溯
  • 可以多消费者争抢消息加快消费速度
  • 可以阻塞读取
  • 没有消息漏读的风险
  • 有消息确认机制保证消息至少被消费一次

总结

STREAM类型消息队列的XREADGROUP命令特点

  • 消息可回溯
  • 可以多消费者争抢消息加快消费速度
  • 可以阻塞读取
  • 没有消息漏读的风险
  • 有消息确认机制保证消息至少被消费一次
阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: Java