Redis分布式锁

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

手写redis分布式锁

为什么需要分布式锁

我们在多线程情况下面对对资源的竞争我们通过mutex互斥锁来保护临界资源来保证数据的正确性那么也就意味着mutex这把锁多个线程都能看到这是在同一个进程里面的情况。
那么如果换做是多进程需要同时操作同一个共享资源那么改如何来操作了
我们又如何来实现互斥了现在的业务应用通常都是微服务的业务那么也就意味着一个应用内部可能有多个进程访问数据库此时为了操作乱序导致预期结果不一致此时我们需要引入分布式锁来进行保护。要想实现分布式锁必须借助一个外部系统因为进程之间是具有独立性的。所有的进程都要通过这个系统进行加锁而这个外部的系统必须实现互斥功能只要一个进程加锁成功之后其他进程在进行加锁则加锁失败。这个外部的系统可以是数据库也可以是redis但是为了追求性能我们一般使用redis.
在这里插入图片描述

如何实现分布式锁

我们只要一把锁必须要有的功能就是互斥功能这也就意味着要想使用redis实现分布式锁那么redis就必须要有这个互斥功能在redis当中我们可以使用setnx命令来达到这个目的。setnx命令博主在这里说一下他的意思是set if not exist 如果key不存在设置其值如果key已经存在了那么就什么也不干。这样多个进程来执行条命令就可以达到互斥的功能。
在这里插入图片描述
加锁成功的进程就可以修改共享资源而加锁失败的线程就等待锁资源。当操作完成之后需要释放锁给后来的操作者操作共享资源的机会。那么在redis当中我们如何释放了很简单我们只需要del lock就可以了。下面给出这个流程的伪代码

  //加锁
  setnx lock 1
  //业务逻辑
  do things
    //释放锁
  del lock

那么实现一个锁就真的怎么简单吗其实不是一把合格的分布式锁需要满足以下内容
在这里插入图片描述
我们上面的实现存在着很大的一个问题那就是当一个客户端拿到锁后如果发生了下面以下场景机会造成死锁

  • 程序处理业务请求逻辑异常并没有计数释放锁
  • 进程挂了没有机会释放锁

上面这两种情况都会导致已经获得锁的客户端一直占用着锁导致其他客户端无法获取到锁。那么我们如何来解决了最容易想到的就是在申请的时候给这个锁设置一个过期时间。注意我们在给锁设置过期时间的时候不能这样设置

 setnx lock 1
 expire lock 10

这样是绝对不可以的这样是存在线程安全问题的。因为这个步骤是分两部的如果出现一下情况仍然会导致死锁

  • setnx lock 1执行成功但是expire lock 10由于网络问题执行失败。
  • setnx lock 1执行成功但是expire lock 10由于redis宕机导致执行失败
  • setnx lock 1执行成功但是expire lock 10由于客户端崩溃导致执行失败

幸好redis扩展了set 命令的参数可以在set 的时候同时指定expire过期时间这条操作是原子的。下面我们给出一个示例

set lock 1 ex 10 nx

在这里插入图片描述
下面我们来使用golang语言来实现一把redis分布式锁

package main

import (
	"github.com/go-redis/redis"
	"sync"
	"time"
)

//初始化redis连接
func initRedis() *redis.Client {
	client := redis.NewClient(&redis.Options{
		Addr:     "101.35.98.26:6379", //redis服务器的ip地址
		Password: "1234",              //redis服务器的密码
		DB:       0,                   //选择的是那个库
		PoolSize: 100,                 //连接池大小
	})
	_, err := client.Ping().Result() //真的开始连接redis
	if err != nil {
		panic(err)
	}
	return client
}

type RedisLock struct {
	rdb        *redis.Client
	Key        string
	expireTime int64
}

func NewRedisLock(key string, exp int64) *RedisLock {
	return &RedisLock{
		rdb:        initRedis(),
		Key:        key,
		expireTime: exp,
	}
}
func (lock *RedisLock) Lock() {
	for {
		isLock, _ := lock.rdb.SetNX(lock.Key, 1, time.Duration(lock.expireTime)*time.Second).Result()
		if !isLock { //加锁失败
			time.Sleep(time.Millisecond * 30) //加锁失败睡眠30毫秒再次尝试进行加锁
		} else { //加锁成功
			break
		}
	}
}
func (lock *RedisLock) UnLock() {
	_, err := lock.rdb.Del(lock.Key).Result()
	//释放锁
	if err != nil {
		panic(err)
	}
}

func main() {
	lock := NewRedisLock("lock", 2)
	var wg sync.WaitGroup
	wg.Add(2)
	go func() {

		defer wg.Done()
		lock.Lock()
		//业务逻辑
		lock.UnLock()
	}()

	go func() {
		defer wg.Done()
		lock.Lock()
		//业务逻辑
		lock.UnLock()
	}()

	wg.Wait()

}

上面的代码有没有什么问题了我们设置这个过期时间一般比这个业务正常情况下处理所需要的时间要多一点那处理业务过程当中出现了异常了下面我们举个例子来说名一下这个情况

  • 客户端1加锁成功开始操作共享资源。但是客户端操作共享资源太久超过了锁的过期时间锁已经失效了。
  • 此时客户端2尝试进行加锁那么此时客户端2肯定是加锁成功的但是等客户端1操作共享资源完成之后进行释放锁也就是del操作但是此时这把锁并不是客户端1的锁把客户端2的锁给释放了。

也就是存在严重的两个问题

  • 锁过期了
  • 释放了别人的锁
    这个解决办法很容易就是客户端加锁时设置一个只有自己知道的唯一标识在释放锁时判断当前这把锁是不是自己的如果是才进行删除。不是就啥都不干。下面给出修改后的代码
package main

import (
	"fmt"
	"github.com/go-redis/redis"
	"math/rand"
	"strconv"
	"sync"
	"time"
)

//初始化redis连接
func initRedis() *redis.Client {
	client := redis.NewClient(&redis.Options{
		Addr:     "101.35.98.26:6379", //redis服务器的ip地址
		Password: "1234",              //redis服务器的密码
		DB:       0,                   //选择的是那个库
		PoolSize: 100,                 //连接池大小
	})
	_, err := client.Ping().Result() //真的开始连接redis
	if err != nil {
		panic(err)
	}
	return client
}

type RedisLock struct {
	rdb        *redis.Client
	Key        string //key
	expireTime int64  //过期时间
	id         string //用来标识
}

func NewRedisLock(key string, exp int64) *RedisLock {
	return &RedisLock{
		rdb:        initRedis(),
		Key:        key,
		expireTime: exp,
		id:         "",
	}
}
func (lock *RedisLock) Lock() {
	id := rand.Int()
	for {
		isLock, _ := lock.rdb.SetNX(lock.Key, 1, time.Duration(lock.expireTime)*time.Second).Result()
		if !isLock { //加锁失败
			time.Sleep(time.Millisecond * 30) //加锁失败睡眠30毫秒再次尝试进行加锁
			fmt.Println("加锁失败")
		} else { //加锁成功
			lock.id = strconv.Itoa(id) //记录一下这个id
			break
		}
	}
}
func (lock *RedisLock) UnLock() {
	//判断一个当前这把锁是不是自己的
	if id, _ := lock.rdb.Get(lock.Key).Result(); id == lock.id { //注意这两部不是原子的后面我们详细说
		_, err := lock.rdb.Del(lock.Key).Result()
		//释放锁
		if err != nil {
			panic(err)
		}
	} else {
		fmt.Println("当前锁不是自己的")
	}

}

func main() {
	lock := NewRedisLock("lock", 2)
	var wg sync.WaitGroup
	wg.Add(2)
	go func() {

		defer wg.Done()
		lock.Lock()
		time.Sleep(time.Second * 3)
		lock.UnLock()

	}()

	go func() {
		defer wg.Done()
		lock.Lock()
		time.Sleep(time.Second * 2)
	}()

	wg.Wait()
}

但是这也带来了这个原子性的问题。释放锁的逻辑是通过GET+DEL两条命令来完成并不是原子的这时候可能会带来原子性的问题

  • 客户端1执行GET,判断锁是自己的执行成功了
  • 刚好此时锁的过期时间到了客户端2开始执行setnx进行加锁客户端1进行del释放送可能由于网络原因客户端2先执行成功此时客户端1进行del释放锁释放的是客户端2的锁张冠李戴。

那么我们如何让这两条命令原子性的执行了答案是Lua脚本我们可以将上述逻辑写出Lua脚本。让redis执行因为Redis的工作线程是单线程的在执行Lua脚本时其他请求必须等待这样GET和DEl之间就不会被其他命令干扰。

Lua脚本

Redis调用Lua脚本通过eval命令保证代码执行的原子性直接使用return 来返回脚本执行后的结果。
下面我们来编写一个入门的案例hello lua通过Lua脚本来返回这个结果先给出这个Lua脚本的格式。eval luascript numkeys [key [key …]] [arg [arg …]]

eval "return 'hello lua'" 0

执行结果如下
在这里插入图片描述
在这里特别需要注意的时这个""里面的东西我们只能使用单引号至于这个0是什么意思我们后面慢慢解释。
下面我们希望通过Lua脚本给我们执行一下命令

  set k1 v1
  expire k1 60
  get k1

通过Lua脚本让这三条命令一起执行完毕。如果我们需要执行redis的命令在Lua脚本当中我们使用redis.call即可

eval "redis.call('set','k1','v1') redis.call('expire','k1','30') return redis.call('get','k1')" 0 

在这里插入图片描述
注意这三条命令都有返回值如果我们在第一个redis.call就return 是会报错的。所以我们需要在最后执行这个return命令。
还有这个k1,v1 我们这里是写死的那我们能不能传参数了当然可以我们下面看一下这个怎么写

eval "return redis.call('mset',KEYS[1],ARGV[1],KEYS[2],ARGV[2])" 2 k1 k2 lua1 lua2

其实最后面这个数字表示的是这个传递的参数个数类似于我们之前所说的环境变量里面的argv和env那样。
在这里插入图片描述
注意在这里数组的下标是从1开始的特别需要注意的是。

下面我们来学习这个带这个if 和else 命令的lua脚本

if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end

下面我们使用sublime这个工具将其整合成为一行然后我们来执行一下这个命令

 eval "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end" 1 k1 v2

执行结果如下
在这里插入图片描述

通过Lua脚本改造分布式锁

下面我们通过Lua脚本来执行一下改造一下上面那个redis分布式锁保证其原子性

package main

import (
	"fmt"
	"github.com/go-redis/redis"
	"math/rand"
	"strconv"
	"sync"
	"time"
)

const (
	script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end" //lua脚本
)

//初始化redis连接
func initRedis() *redis.Client {
	client := redis.NewClient(&redis.Options{
		Addr:     "101.35.98.26:6379", //redis服务器的ip地址
		Password: "1234",              //redis服务器的密码
		DB:       0,                   //选择的是那个库
		PoolSize: 100,                 //连接池大小
	})
	_, err := client.Ping().Result() //真的开始连接redis
	if err != nil {
		panic(err)
	}
	return client
}

type RedisLock struct {
	rdb        *redis.Client
	Key        string //key
	expireTime int64  //过期时间
	id         string //用来标识
}

func NewRedisLock(key string, exp int64) *RedisLock {
	return &RedisLock{
		rdb:        initRedis(),
		Key:        key,
		expireTime: exp,
		id:         "",
	}
}
func (lock *RedisLock) Lock() {
	id := rand.Int()
	for {
		isLock, _ := lock.rdb.SetNX(lock.Key, 1, time.Duration(lock.expireTime)*time.Second).Result()
		if !isLock { //加锁失败
			time.Sleep(time.Millisecond * 30) //加锁失败睡眠30毫秒再次尝试进行加锁
			fmt.Println("加锁失败")
		} else { //加锁成功
			fmt.Println("加锁成功")
			lock.id = strconv.Itoa(id) //记录一下这个id
			break
		}
	}
}
func (lock *RedisLock) UnLock() {

	lock.rdb.Eval(script, []string{lock.Key}, lock.id) //执行Lua脚本
}

func main() {
	lock := NewRedisLock("lock", 2)
	var wg sync.WaitGroup
	wg.Add(2)
	go func() {

		defer wg.Done()
		lock.Lock()
		time.Sleep(time.Second * 3)
		lock.UnLock()
	}()

	go func() {
		defer wg.Done()
		lock.Lock()
		time.Sleep(time.Second * 2)
	}()

	wg.Wait()

}

到这里我们的redis分布式锁就差不多了但是我们没有解决则个锁的可重入性问题。在同一个线程当中我已经获取到锁了我们在处理业务逻辑的时候我还需要获取到这把锁应该成功才对。所以使用setnx命令是远远不够的。

可重入的Redis分布式锁

上面那个分布式锁并没有解决这个可重入问题和锁的续期。如何兼顾锁的可重入问题
用一句话来说就是一个线程当中的多个流程可以获取到同一把锁持有这把锁同步锁可以再次进入自己可以获取自己的内部锁。
很明显我们使用setnx命令已经无法保证这个锁的可重入性了。此时我们可以使用hash这种数据类型通过引用计数的方式来保证锁的可重入性
下面我们通过这个redis命令来说明这个如何实现redis分布式锁。
在这里插入图片描述
在这里插入图片描述
同样的这里命令的条数有很多我们需要他们一起执行成功依然需要通过Lua脚本来保证其原子性。下面我们开始来编写可重入redis分布式锁的加锁的Lua脚本

//加锁的lua脚本
if redis.call('exists','key')==0 then//锁是否存在
    redis.call('hset','key','uuid:threadid','1')
    return 1
elseif redis.call('hexists','key','uuidthreadid')==1 then//判断锁是不是自己的
    redis.call('hincrby','key','uuidthreadid',1)
    redis.call('expire','key',50)
    return 1
else
    return 0
end

上面这个是这个这个伪代码其实这个if和elseif可以进行合并因为hincrby如果不存在会新建。那么这个lua脚本就可以合并成为这样

if redis.call('exists','key')==0 or redis.call('hexists','key','uuidthreadid')==1 then
    redis.call('hincrby','key','uuid:threadid','1')
    redis.call('expire','key','40')
    return 1
else
    return 0
end

下面我们可以将这个写死的进行参数替换

if redis.call('exists',KEYS[1])==0 or redis.call('hexists',KEYS[1],ARGV[1])==1 then
    redis.call('hincrby',KEYS[1],ARGV[1],'1')
    redis.call('expire',KEYS[1],ARGV[2])
    return 1
else
    return 0
end

我们在放到Redis当进行执行看看

eval "if redis.call('exists',KEYS[1])==0 or redis.call('hexists',KEYS[1],ARGV[1])==1 then redis.call('hincrby',KEYS[1],ARGV[1],1) redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end" 1 lock 123456 50

在这里插入图片描述
执行结果如上图所示。
下面我们来编写一下解锁的逻辑其实同样的很简单下面给出解锁的伪代码

//解锁unlock 方法
if redis.call('hexists',key,uuid:threadid)==0 then
    return nil
elseif redis.call('hincrby',key,uuid:threadid,-1)==0 then
    return redis.call('del',key)
else
    return 0
end

下面我们记录将这个写死的变量写出参数传递的样子。下面我直接给出这个代码

if redis.call('hexists',KEYS[1],ARGV[1])==0 then
    return nil
elseif redis.call('hincrby',KEYS[1],ARGV[1],-1)==0 then
    return redis.call('del',KEYS[1])
else
    return 0
end

在这里插入图片描述

我们先加锁三次然后在解锁三次我们在次查看这个锁就把删除了。这个非常的舒服哈哈哈下面我们进入编码阶段编写一个可插入的redis分布式锁.

package main

import (
	"context"
	"fmt"
	"github.com/go-redis/redis"
	"strconv"
	"sync"
	"time"
)

const (
	lockLua = "if redis.call('exists',KEYS[1])==0 or redis.call('hexists',KEYS[1],ARGV[1])==1 then redis.call('hincrby',KEYS[1],ARGV[1],1) redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end"
)
const (
	unlockLua = "if redis.call('hexists',KEYS[1],ARGV[1])==0 then return -1 elseif redis.call('hincrby',KEYS[1],ARGV[1],-1)==0 then return redis.call('del',KEYS[1]) else return 0 end"
)

const (
	keepAlive = "if redis.call('hexists',KEYS[1],ARGV[1])==1 then return redis.call('expire',KEYS[1],ARGV[2]) else return 0 end"
)

//初始化redis连接
func initRedis() *redis.Client {
	client := redis.NewClient(&redis.Options{
		Addr:     "101.35.98.26:6379", //redis服务器的ip地址
		Password: "1234",              //redis服务器的密码
		DB:       0,                   //选择的是那个库
		PoolSize: 100,                 //连接池大小
	})
	_, err := client.Ping().Result() //真的开始连接redis
	if err != nil {
		panic(err)
	}
	return client
}

type RedisLock struct {
	rdb        *redis.Client
	Key        string             //key
	expireTime int64              //过期时间
	id         string             //用来标识
	ctx        context.Context    //用来控制任务
	cancel     context.CancelFunc //取消函数
	MaxWait    time.Duration      //等待锁的最长超时时间

}

func NewRedisLock(key string, exp int64) *RedisLock {
	return &RedisLock{
		rdb:        initRedis(),
		Key:        key,
		expireTime: exp,
		id:         "",
		ctx:        context.Background(),
		MaxWait:    time.Second * 10,
		cancel:     nil,
	}
}
func (lock *RedisLock) TryLock(id int) bool {

	res := lock.rdb.Eval(lockLua, []string{lock.Key}, id, lock.expireTime)
	fmt.Printf("the id is%d\n", id)
	if res.Err() != nil {
		fmt.Println(res.Err())
		return false
	}

	val, ok := res.Val().(int64) //类型断言
	fmt.Println(val)
	if ok && val == 1 {
		if lock.cancel == nil { //说明已经设置过了
			ctx, cancel := context.WithCancel(lock.ctx)
			lock.cancel = cancel
			go keepAliveWorker(ctx, lock)
		}
		lock.id = strconv.Itoa(id)
		return true
	}
	return false
}
func (lock *RedisLock) Lock(id int) (err error) {

	res := lock.TryLock(id)
	if res == true {
		fmt.Printf("加锁成功%d", id)
		return nil
	}

	ctx, _ := context.WithTimeout(lock.ctx, lock.MaxWait)
	for {
		waiter := time.After(time.Duration(30) * time.Millisecond) //每隔30秒获取一次
		select {
		case <-waiter:
			res = lock.TryLock(id)
			if res {
				fmt.Printf("获取锁成功%d", id)
				return nil
			}

		case <-ctx.Done():
			fmt.Println("获取锁超时")
			return fmt.Errorf("获取锁超时")
		}
	}
	return
}

func keepAliveWorker(ctx context.Context, lock *RedisLock) {
	waiter := time.Tick(time.Duration(lock.expireTime/3) * time.Second)
	for {
		select {
		case <-waiter: //执行续租任务
			res := lock.rdb.Eval(keepAlive, []string{lock.Key}, lock.id, lock.expireTime)
			fmt.Printf("the 续期id为%s", lock.id)
			if res.Err() != nil {
				panic(res.Err())
			}
			if res.Val().(int64) == 0 {
				fmt.Println("锁已经过期了不需要在续了")
				return
			} else {
				fmt.Println("续期成功")
			}
		case <-ctx.Done():
			return //不需要续租了
		}
	}

}
func (lock *RedisLock) UnLock() error {
	res := lock.rdb.Eval(unlockLua, []string{lock.Key}, lock.id)

	if res.Err() != nil {
		fmt.Println(res.Err())
		return res.Err()
	}
	val := res.Val().(int64)
	fmt.Printf("the unlockval is%d %s\n", val, lock.id)
	if val == 1 && lock.cancel != nil {
		lock.cancel()
		fmt.Printf("释放锁成功%s\n", lock.id)
		lock.cancel = nil
		return nil
	} else {
		fmt.Printf("还没解锁完%s", lock.id)
		return fmt.Errorf("还没有解锁完")
	}

}

func main() {
	lock := NewRedisLock("lock", 10)
	var wg sync.WaitGroup
	wg.Add(2)
	go func() {
		defer wg.Done()
		lock.Lock(1)
		time.Sleep(time.Second * 20)
		lock.UnLock()

	}()
	time.Sleep(time.Second * 1)
	go func() {
		defer wg.Done()
		lock.Lock(2)
		fmt.Println("locked?")
		//lock.UnLock()
	}()

	wg.Wait()
	lock.Lock(5)

}

本次编写的redis分布式锁考虑了这个续期的问题也就是万一处理业务时间太长我们需要加个钟,所以我们就可以加锁成功之后马上创建一个协程定时的去查看锁的是否过期。这个也叫做看门狗。
在这里插入图片描述
如果此时客户端加锁成功后就宕机了那么看门狗线程就不存在也就无法为这个锁续期了锁到期就自动释放了。

RedLock

我们之前说的之前的锁都是一台机器实列当中如果这台机器挂了那么完蛋了无法提供加锁和解锁服务。
Redis发展到现在几种常见的部署架构有

  • 单机模式
  • 主从模式
  • 哨兵sentinel模式
  • 集群模式

我们使用Redis时一般会采用主从集群+哨兵的模式部署哨兵的作用就是监测redis节点的运行状态。普通的主从模式当master崩溃时需要手动切换让slave成为master使用主从+哨兵结合的好处在于当master异常宕机时哨兵可以实现故障自动切换把slave提升为新的master继续提供服务以此保证可用性。那么当主从发生切换时分布式锁依旧安全吗
在这里插入图片描述
想像这样的场景

1.客户端1在master上执行SET命令加锁成功
2.此时master异常宕机SET命令还未同步到slave上主从复制是异步的
3.哨兵将slave提升为新的master但这个锁在新的master上丢失了导致客户端2来加锁成功了两个客户端共同操作共享资源。

可见当引入Redis副本后分布式锁还是可能受到影响。即使Redis通过sentinel保证高可用如果这个master节点由于某些原因发生了主从切换那么就会出现锁丢失的情况。

为了避免Redis实例故障而导致的锁无法工作的问题Redis的开发者 Antirez提出了分布式锁算法Redlock。Redlock算法的基本思路是让客户端和多个独立的Redis实例依次请求加锁如果客户端能够和半数以上的实例成功地完成加锁操作那么我们就认为客户端成功地获得分布式锁了否则加锁失败。这样一来即使有单个Redis实例发生故障因为锁变量在其它实例上也有保存所以客户端仍然可以正常地进行锁操作锁变量并不会丢失。

来具体看下Redlock算法的执行步骤。Redlock算法的实现要求Redis采用集群部署模式无哨兵节点需要有N个独立的Redis实例官方推荐至少5个实例。接下来我们可以分成3步来完成加锁操作。
在这里插入图片描述

  • 第一步是客户端获取当前时间。
  • 第二步是客户端按顺序依次向N个Redis实例执行加锁操作。
  • 这里的加锁操作和在单实例上执行的加锁操作一样使用SET命令带上NX、EX/PX选项以及带上客户端的唯一标识。当然如果某个Redis实例发生故障了为了保证在这种情况下Redlock算法能够继续运行我们需要给加锁操作设置一个超时时间。如果客户端在和一个Redis实例请求加锁时一直到超时都没有成功那么此时客户端会和下一个Redis实例继续请求加锁。加锁操作的超时时间需要远远地小于锁的有效时间一般也就是设置为几十毫秒。
  • 第三步是一旦客户端完成了和所有Redis实例的加锁操作客户端就要计算整个加锁过程的总耗时。客户端只有在满足两个条件时才能认为是加锁成功条件一是客户端从超过半数大于等于 N/2+1的Redis实例上成功获取到了锁条件二是客户端获取锁的总耗时没有超过锁的有效时间。
  • 为什么大多数实例加锁成功才能算成功呢多个Redis实例一起来用其实就组成了一个分布式系统。在分布式系统中总会出现异常节点所以在谈论分布式系统时需要考虑异常节点达到多少个也依旧不影响整个系统的正确运行。这是一个分布式系统的容错问题这个问题的结论是如果只存在故障节点只要大多数节点正常那么整个系统依旧可以提供正确服务。
  • 在满足了这两个条件后我们需要重新计算这把锁的有效时间计算的结果是锁的最初有效时间减去客户端为获取锁的总耗时。如果锁的有效时间已经来不及完成共享数据的操作了我们可以释放锁以免出现还没完成共享资源操作锁就过期了的情况。
阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: redis