8. 【Redisson源码】分布式信号量RSemaphore

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

目录

一、RSemaphore的使用

二、RSemaphore设置许可数量

三、RSemaphore的加锁流程

四、RSemaphore的解锁流程


【本篇文章基于redisson-3.17.6版本源码进行分析】

基于Redis的Redisson的分布式信号量RSemaphore采用了与java.util.concurrent.Semaphore相似的接口和用法。

一、RSemaphore的使用

@Test
public void testRSemaphore() {
    Config config = new Config();
    config.useSingleServer().setAddress("redis://127.0.0.1:6379");
    RedissonClient redissonClient = Redisson.create(config);
    RSemaphore rSemaphore = redissonClient.getSemaphore("semaphore");
    // 设置5个许可模拟五个停车位
    rSemaphore.trySetPermits(5);

    // 创建10个线程模拟10辆车过来停车
    for (int i = 1; i <= 10; i++) {
        new Thread(() -> {
            try {
                rSemaphore.acquire();
                System.out.println(Thread.currentThread().getName() + "进入停车场...");
                TimeUnit.MILLISECONDS.sleep(new Random().nextInt(100));
                System.out.println(Thread.currentThread().getName() + "离开停车场...");
                rSemaphore.release();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }, "A" + i).start();
    }

    try {
        TimeUnit.MINUTES.sleep(1);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}

二、RSemaphore设置许可数量

初始化RSemaphore需要调用trySetPermits()设置许可数量

/**
 * 尝试设置许可数量设置成功返回true否则返回false
 */
boolean trySetPermits(int permits);

trySetPermits()内部调用了trySetPermitsAsync()

// 异步设置许可
@Override
public RFuture<Boolean> trySetPermitsAsync(int permits) {
    RFuture<Boolean> future = commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            // 判断分布式信号量的key是否存在如果不存在才设置
            "local value = redis.call('get', KEYS[1]); " +
                    "if (value == false) then "
                    // set "semaphore" permits
                    // 使用String数据结构设置信号量的许可数
                    + "redis.call('set', KEYS[1], ARGV[1]); "
                    // 发布一条消息到redisson_sc:{semaphore}通道
                    + "redis.call('publish', KEYS[2], ARGV[1]); "
                    // 设置成功返回1
                    + "return 1;"
                    + "end;"
                    // 否则返回0
                    + "return 0;",
            Arrays.asList(getRawName(), getChannelName()), permits);

    if (log.isDebugEnabled()) {
        future.thenAccept(r -> {
            if (r) {
                log.debug("permits set, permits: {}, name: {}", permits, getName());
            } else {
                log.debug("unable to set permits, permits: {}, name: {}", permits, getName());
            }
        });
    }
    return future;
}

可以看到设置许可数量底层使用LUA脚本实际上就是使用redis的String数据结构保存了我们指定的许可数量。如下图

参数说明:

  • KEYS[1]: 我们指定的分布式信号量key例如redissonClient.getSemaphore("semaphore")中的"semaphore")
  • KEYS[2]: 释放锁的channel名称redisson_sc:{分布式信号量key}在本例中就是redisson_sc:{semaphore}
  • ARGV[1]: 设置的许可数量

总结设置许可执行流程为

  • get semaphore获取到semaphore信号量的当前的值
  • 第一次数据为0 然后使用set semaphore 3将这个信号量同时能够允许获取锁的客户端的数量设置为3。注意到如果之前设置过了信号量将无法再次设置直接返回0。想要更改信号量总数可以使用addPermits方法
  • 然后redis发布一些消息返回1

三、RSemaphore的加锁流程

许可数量设置好之后我们就可以调用acquire()方法获取了如果未传入许可数量默认获取一个许可。

public void acquire() throws InterruptedException {
    acquire(1);
}

public void acquire(int permits) throws InterruptedException {
    // 尝试获取锁成功直接返回
    if (tryAcquire(permits)) {
        return;
    }

    // 对于没有获取锁的那些线程订阅redisson_sc:{分布式信号量key}通道的消息
    CompletableFuture<RedissonLockEntry> future = subscribe();
    semaphorePubSub.timeout(future);
    RedissonLockEntry entry = commandExecutor.getInterrupted(future);
    try {
        // 不断循环尝试获取许可
        while (true) {
            if (tryAcquire(permits)) {
                return;
            }

            entry.getLatch().acquire();
        }
    } finally {
        // 取消订阅
        unsubscribe(entry);
    }
//        get(acquireAsync(permits));
}

可以看到获取许可的核心逻辑在tryAcquire()方法中如果tryAcquire()返回true说明获取许可成功直接返回如果返回false说明当前没有许可可以使用则对于没有获取锁的那些线程订阅redisson_sc:{分布式信号量key}通道的消息并通过死循环不断尝试获取锁。

我们看一下tryAcquire()方法的逻辑内部调用了tryAcquireAsync()方法

// 异步获取许可
@Override
public RFuture<Boolean> tryAcquireAsync(int permits) {
    if (permits < 0) {
        throw new IllegalArgumentException("Permits amount can't be negative");
    }
    if (permits == 0) {
        return new CompletableFutureWrapper<>(true);
    }

    return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
              // 获取当前剩余的许可数量
              "local value = redis.call('get', KEYS[1]); " +
              // 许可不为空并且许可数量 大于等于 当前线程申请的许可数量        
              "if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then " +
                  // 通过decrby减少剩余可用许可    
                  "local val = redis.call('decrby', KEYS[1], ARGV[1]); " +
                  // 返回1    
                  "return 1; " +
              "end; " +
              // 其它情况返回0        
              "return 0;",
              Collections.<Object>singletonList(getRawName()), permits);
}

从源码可以看到获取许可就是操作redis中的数据首先获取到redis中剩余的许可数量只有当剩余的许可数量大于线程申请的许可数量时才获取成功返回1否则获取失败返回0

总结加锁执行流程为

  • get semaphore获取到一个当前的值比如说是33 > 1
  • decrby semaphore 1将信号量允许获取锁的客户端的数量递减1变成2
  • decrby semaphore 1
  • decrby semaphore 1
  • 执行3次加锁后semaphore值为0
  • 此时如果再来进行加锁则直接返回0然后进入死循环去获取锁

四、RSemaphore的解锁流程

通过前面对RSemaphore获取锁的分析我们很容易能猜到释放锁无非就是归还许可数量到redis中。我们查看具体的源码

public RFuture<Void> releaseAsync(int permits) {
    if (permits < 0) {
        throw new IllegalArgumentException("Permits amount can't be negative");
    }
    if (permits == 0) {
        return new CompletableFutureWrapper<>((Void) null);
    }

    RFuture<Void> future = commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
            // 通过incrby增加许可数量
            "local value = redis.call('incrby', KEYS[1], ARGV[1]); " +
                    // 发布一条消息到redisson_sc:{semaphore}中
                    "redis.call('publish', KEYS[2], value); ",
            Arrays.asList(getRawName(), getChannelName()), permits);
    if (log.isDebugEnabled()) {
        future.thenAccept(o -> {
            log.debug("released, permits: {}, name: {}", permits, getName());
        });
    }
    return future;
}

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: redis