前言
读写锁的好处就是能帮助客户读到的数据一定是最新的,写锁是排他锁,而读锁是一个共享锁,如果写锁一直存在,那么读取数据就要一直等待,直到写入数据完成才能看到,保证了数据的一致性
一、为什么使用Lua
Lua脚本是高并发、高性能的必备脚本语言, 大部分的开源框架(如:redission)中的分布式锁组件,都是用纯lua脚本实现的。
那么,为什么要使用Lua语言来实现分布式锁呢?我们从一个案例看起:
所以,只有确保判断锁和删除锁是一步操作时,才能避免上面的问题,才能确保原子性。
其实很简单,首先获取锁对应的value值,检查是否与requestId相等,如果相等则删除锁(解锁)。虽然看似做了两件事,但是却只有一个完整的原子操作。
第一行代码,我们写了一个简单的 Lua 脚本代码; 第二行代码,我们将Lua代码传到 edis.eval()方法里,并使参数 KEYS[1] 赋值为 lockKey,ARGV[1] 赋值为 requestId,eval() 方法是将Lua代码交给 Redis 服务端执行。
二、执行流程
加锁和删除锁的操作,使用纯 Lua 进行封装,保障其执行时候的原子性。
基于纯Lua脚本实现分布式锁的执行流程,大致如下:
三、代码详解
lualock.lua
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | -- KEYS = [LOCK_KEY, LOCK_INTENT] -- ARGV = [LOCK_ID, TTL] local t = redis.call( 'TYPE' , KEYS[ 1 ])[ "ok" ] if t == "string" then return redis.call( 'PTTL' , KEYS[ 1 ]) end if redis.call( "EXISTS" , KEYS[ 2 ]) == 1 then return redis.call( 'PTTL' , KEYS[ 2 ]) end redis.call( 'SADD' , KEYS[ 1 ], ARGV[ 1 ]) redis.call( 'PEXPIRE' , KEYS[ 1 ], ARGV[ 2 ]) return nil |
-
-- KEYS = [LOCK_KEY, LOCK_INTENT]
和-- ARGV = [LOCK_ID, TTL, ENABLE_LOCK_INTENT]
-
if not redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2], "NX") then
行首使用了redis.call
函数调用,将 LOCK_KEY 和 LOCK_ID 存储到 Redis 中,并设置过期时间为 TTL。如果设置失败,则进入条件内部。 - 在条件内部,判断 ENABLE_LOCK_INTENT 的值。如果为 1,则执行
redis.call("SET", KEYS[2], 1, "PX", ARGV[2])
,将 LOCK_INTENT 键设置为 1,并设置与 LOCK_KEY 相同的过期时间。这是为了表示锁被占用的意图。 - 返回
redis.call("PTTL", KEYS[1])
,即 LOCK_KEY 的剩余过期时间,以毫秒为单位。这是为了告知调用方锁已被占用,返回锁的剩余过期时间。 - 若上述条件都不满足,则执行
redis.call("DEL", KEYS[2])
,删除 LOCK_INTENT 键。 - 返回
nil
,表示锁已成功获取。
它首先尝试通过 SET 命令将 LOCK_KEY 存储到 Redis 中,如果设置失败,则表示锁已被其他进程占用,返回锁的剩余过期时间。如果设置成功,则删除 LOCK_INTENT 键,表示锁已成功获取
luarefresh.lua
1 2 3 4 5 6 7 8 9 10 | -- KEYS = [LOCK_KEY] -- ARGV = [LOCK_ID, TTL] local t = redis.call( 'TYPE' , KEYS[ 1 ])[ "ok" ] if (t == "string" and redis.call( 'GET' , KEYS[ 1 ]) ~= ARGV[ 1 ]) or (t == "set" and redis.call( 'SISMEMBER' , KEYS[ 1 ], ARGV[ 1 ]) == 0 ) or (t == "none" ) then return 0 end return redis.call( 'PEXPIRE' , KEYS[ 1 ], ARGV[ 2 ]) |
- 延长锁的时间
luarlock.lua
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | -- KEYS = [LOCK_KEY, LOCK_INTENT] -- ARGV = [LOCK_ID, TTL] local t = redis.call( 'TYPE' , KEYS[ 1 ])[ "ok" ] if t == "string" then return redis.call( 'PTTL' , KEYS[ 1 ]) end if redis.call( "EXISTS" , KEYS[ 2 ]) == 1 then return redis.call( 'PTTL' , KEYS[ 2 ]) end redis.call( 'SADD' , KEYS[ 1 ], ARGV[ 1 ]) redis.call( 'PEXPIRE' , KEYS[ 1 ], ARGV[ 2 ]) return nil |
-
local t = redis.call('TYPE', KEYS[1])["ok"]
通过TYPE
命令获取键的类型,并将结果存储在变量t
中。 -
使用条件逻辑判断锁的状态:
- 如果
t
是字符串,则返回PTTL
命令的结果,即锁的剩余过期时间。 - 如果
LOCK_INTENT
键存在,则返回PTTL
命令的结果,即锁占用意图的剩余过期时间。
- 如果
-
由于以上条件都不满足,即锁未被占用,将锁 ID (
ARGV[1]
) 添加到LOCK_KEY
集合中。 -
使用
PEXPIRE
命令设置LOCK_KEY
的过期时间为ARGV[2]
(以毫秒为单位)。 -
返回
nil
,表示锁已成功获取。
luaunlock.lua
1 2 3 4 5 6 7 8 9 10 11 12 | -- KEYS = [LOCK_KEY] -- ARGV = [LOCK_ID] local t = redis.call( 'TYPE' , KEYS[ 1 ])[ "ok" ] if t == "string" and redis.call( 'GET' , KEYS[ 1 ]) == ARGV[ 1 ] then return redis.call( 'DEL' , KEYS[ 1 ]) elseif t == "set" and redis.call( 'SISMEMBER' , KEYS[ 1 ], ARGV[ 1 ]) == 1 then redis.call( 'SREM' , KEYS[ 1 ], ARGV[ 1 ]) if redis.call( 'SCARD' , KEYS[ 1 ]) == 0 then return redis.call( 'DEL' , KEYS[ 1 ]) end end return 1 |
- 检查指定键的类型,如果是字符串并且键的值等于给定的
ARGV
值,则删除该键。 - 如果指定键的类型是集合,并且集合中包含给定的
ARGV
值,则将该值从集合中移除。随后,如果集合中不再包含任何元素,则删除该键。
写优先还是读优先?
写锁会阻塞读锁,所以是写优先
写锁是如何阻塞写锁的?
如果当前的写锁已经被占用,其他写锁的获取请求会被阻塞,因为在释放锁的逻辑中,会先判断锁的类型,如果是写锁,则会判断当前锁的值是否符合预期,从而判断能否删除该锁。
读锁与读锁之间互斥吗?
对于读锁而言,多个读锁之间是可以并发持有的,因此读锁之间默认是不会互斥的,可以同时执行读操作。
写锁会有被饿死的情况吗?
写优先锁可以保证写线程不会饿死,但是如果一直有写线程获取写锁,读线程也会被「饿死」。
既然不管优先读锁还是写锁,对方可能会出现饿死问题,那么我们就不偏袒任何一方,搞个「公平读写锁」。
公平读写锁比较简单的一种方式是:用队列把获取锁的线程排队,不管是写线程还是读线程都按照先进先出的原则加锁即可,这样读线程仍然可以并发,也不会出现「饥饿」的现象。
抽象lock类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | import ( "context" "errors" "time" "github.com/redis/go-redis/v9" ) var _ context.Context = (*Lock)( nil ) // Lock represents a lock with context. type Lock struct { redis redis.Scripter id string ttl time.Duration key string log LogFunc ctx context.Context cancel context.CancelFunc } // ID returns the id value set by the lock. func (l *Lock) ID() string { return l.id } // Key returns the key value set by the lock. func (l *Lock) Key() string { return l.key } func (l *Lock) Deadline() (deadline time.Time, ok bool ) { return l.ctx.Deadline() } func (l *Lock) Done() |
-
ID()
:返回锁的ID。 -
Key()
:返回锁的键名。 -
Deadline()
:返回锁的截止时间和标志,如果没有设置则返回零值。 -
Done()
:返回一个通道,在锁的上下文被取消或者锁过期后会被关闭。 -
Err()
:返回锁的错误状态。 -
Value(key any) any
:返回一个键关联的值,用于传递上下文相关的数据。 -
Unlock()
:解锁操作,会取消锁的上下文,并调用Redis的脚本解锁操作。 -
refreshTTL(left time.Time)
:刷新锁的过期时间,定期更新Redis中锁的过期时间,直到锁的上下文被取消、锁过期或无法继续刷新为止。 -
leftTTL()
:返回锁的剩余过期时间。 -
updateTTL()
:更新刷新锁的间隔时间。每次减少一半
为什么需要为什么l.ttl / 2
这是为了实现锁的自动续约。通过定期刷新锁的过期时间,可以确保锁在使用过程中不会过期而被意外释放。
这种做法可以在以下情况下带来一些好处:
- 减少锁的续约操作对Redis的压力:由于续约操作是相对较昂贵的,通过将过期时间缩短为原来的一半,可以降低续约的频率,从而减少对Redis的请求,减少了网络和计算资源的消耗。
- 避免长时间持有锁带来的问题:如果某个持有锁的进程/线程发生故障或延迟,导致无法及时释放锁,那么其他进程可能会长时间等待获取该锁,造成资源浪费。通过定期刷新锁的过期时间,可以在锁即将过期之前及时释放锁,降低该问题的风险。
Options
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 | package redismutex import ( "context" "log" "os" "sync" "time" ) const ( lenBytesID = 16 refreshTimeout = time.Millisecond * 500 defaultKeyTTL = time.Second * 4 ) var ( globalMx sync.RWMutex globalLog = func () LogFunc { l := log. New (os.Stderr, "redismutex: " , log.LstdFlags) return func (format string , v ...any) { l.Printf(format, v...) } }() ) // LogFunc type is an adapter to allow the use of ordinary functions as LogFunc. type LogFunc func (format string , v ...any) // NopLog logger does nothing var NopLog = LogFunc( func ( string , ...any) {}) // SetLog sets the logger. func SetLog(l LogFunc) { globalMx.Lock() defer globalMx.Unlock() if l != nil { globalLog = l } } // MutexOption is the option for the mutex. type MutexOption func (*mutexOptions) type mutexOptions struct { name string ttl time.Duration lockIntent bool log LogFunc } // WithTTL sets the TTL of the mutex. func WithTTL(ttl time.Duration) MutexOption { return func (o *mutexOptions) { if ttl >= time.Second* 2 { o.ttl = ttl } } } // WithLockIntent sets the lock intent. func WithLockIntent() MutexOption { return func (o *mutexOptions) { o.lockIntent = true } } // LockOption is the option for the lock. type LockOption func (*lockOptions) type lockOptions struct { ctx context.Context key string lockIntentKey string enableLockIntent int ttl time.Duration log LogFunc } func newLockOptions(m mutexOptions, opt ...LockOption) lockOptions { opts := lockOptions{ ctx: context.Background(), key: m.name, enableLockIntent: boolToInt(m.lockIntent), ttl: m.ttl, log: m.log, } for _, o := range opt { o(&opts) } opts.lockIntentKey = lockIntentKey(opts.key) return opts } // WithKey sets the key of the lock. func WithKey(key string ) LockOption { return func (o *lockOptions) { if key != "" { o.key += ":" + key } } } // WithContext sets the context of the lock. func WithContext(ctx context.Context) LockOption { return func (o *lockOptions) { if ctx != nil { o.ctx = ctx } } } func boolToInt(b bool ) int { if b { return 1 } return 0 } func lockIntentKey(key string ) string { return key + ":lock-intent" } |
-
SetLog(l LogFunc)
:设置日志记录器。 -
WithTTL(ttl time.Duration)
:设置互斥锁的生存时间(TTL)选项。 -
WithLockIntent()
:设置锁意图选项。 -
newLockOptions(m mutexOptions, opt ...LockOption)
:创建锁的选项。 -
WithKey(key string)
:设置锁的键选项。 -
WithContext(ctx context.Context)
:设置锁的上下文选项。 -
lockIntentKey(key string)
:为给定的锁键生成锁意图键。
可以通过设置选项来控制互斥锁的行为和属性,如生存时间、锁意图、上下文等。还提供了一些实用函数和类型,用于管理互斥锁和生成选项
redismutex
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 | // Package redismutex provides a distributed rw mutex. package redismutex import ( "context" "crypto/rand" "embed" "encoding/hex" "errors" "sync" "time" "github.com/redis/go-redis/v9" ) var ErrLock = errors. New ( "redismutex: lock not obtained" ) var ( //go:embed lua lua embed.FS scriptRLock *redis.Script scriptLock *redis.Script scriptRefresh *redis.Script scriptUnlock *redis.Script ) func init() { scriptRLock = redis.NewScript(mustReadFile( "rlock.lua" )) scriptLock = redis.NewScript(mustReadFile( "lock.lua" )) scriptRefresh = redis.NewScript(mustReadFile( "refresh.lua" )) scriptUnlock = redis.NewScript(mustReadFile( "unlock.lua" )) } // A RWMutex is a distributed mutual exclusion lock. type RWMutex struct { redis redis.Scripter opts mutexOptions id struct { sync.Mutex buf [] byte } } // NewMutex creates a new distributed mutex. func NewMutex(rc redis.Scripter, name string , opt ...MutexOption) *RWMutex { globalMx.RLock() defer globalMx.RUnlock() opts := mutexOptions{ name: name, ttl: defaultKeyTTL, log: globalLog, } for _, o := range opt { o(&opts) } rw := &RWMutex{ redis: rc, opts: opts, } rw.id.buf = make ([] byte , lenBytesID) return rw } // TryRLock tries to lock for reading and reports whether it succeeded. func (m *RWMutex) TryRLock(opt ...LockOption) (*Lock, bool ) { opts := newLockOptions(m.opts, opt...) ctx, _, err := m.rlock(opts) if err != nil { if !errors.Is(err, ErrLock) { m.opts.log( "[ERROR] try-read-lock key %q: %v" , opts.key, err) } return nil , false } return ctx, true } // RLock locks for reading. func (m *RWMutex) RLock(opt ...LockOption) (*Lock, bool ) { opts := newLockOptions(m.opts, opt...) ctx, ttl, err := m.rlock(opts) if err == nil { return ctx, true } if !errors.Is(err, ErrLock) { m.opts.log( "[ERROR] read-lock key %q: %v" , opts.key, err) return nil , false } for { select { case |
- 通过
NewMutex
函数创建一个新的分布式互斥锁。该函数接受 Redis 客户端、锁的名称和一系列选项作为参数,返回一个 RWMutex 结构体实例。 - 通过
RLock
和Lock
方法来获取读锁和写锁。如果无法立即获取锁,则会阻塞等待,直到获取成功或者上下文取消。 - 通过
TryRLock
和TryLock
方法来尝试获取读锁和写锁,如果无法立即获取锁则立即返回失败,不会阻塞。 - 该包实现了一个
Lock
结构体,包含了锁相关的信息和操作方法,比如刷新锁的过期时间。 - 使用
redis.Script
来执行 Lua 脚本,通过 Redis 客户端执行相应的 Redis 命令。 - 使用了
crypto/rand
包来生成随机的锁标识符。 - 最终的
mustReadFile
函数用于读取嵌入的 Lua 脚本文件。
测试用例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 | package redismutex import ( "context" "errors" "log" "strings" "testing" "time" "github.com/redis/go-redis/v9" ) func init() { SetLog( func (format string , a ...any) { if strings.HasPrefix(format, "[ERROR]" ) { log.Fatalf(format, a...) } }) } func TestMutex(t *testing.T) { t.Parallel() const lockKey = "mutex" rc := redis.NewClient(redisOpts()) prep(t, rc, lockKey) mx := NewMutex(rc, lockKey) lock, ok := mx.Lock() if exp, got := true , ok; exp != got { t.Fatalf( "exp %v, got %v" , exp, got) } defer lock.Unlock() assertTTL(t, rc, lockKey, defaultKeyTTL) // try again _, ok = mx.TryLock() if exp, got := false , ok; exp != got { t.Fatalf( "exp %v, got %v" , exp, got) } _, ok = mx.TryRLock() if exp, got := false , ok; exp != got { t.Fatalf( "exp %v, got %v" , exp, got) } // manually unlock lock.Unlock() // lock again lock, ok = mx.Lock() if exp, got := true , ok; exp != got { t.Fatalf( "exp %v, got %v" , exp, got) } defer lock.Unlock() } func TestRWMutex(t *testing.T) { t.Parallel() const lockKey = "rw_mutex" rc := redis.NewClient(redisOpts()) prep(t, rc, lockKey) mx := NewMutex(rc, lockKey) lock, ok := mx.RLock() if exp, got := true , ok; exp != got { t.Fatalf( "exp %v, got %v" , exp, got) } defer lock.Unlock() assertTTL(t, rc, lockKey, defaultKeyTTL) // try again _, ok = mx.TryLock() if exp, got := false , ok; exp != got { t.Fatalf( "exp %v, got %v" , exp, got) } // try rlock rlock, ok := mx.TryRLock() if exp, got := true , ok; exp != got { t.Fatalf( "exp %v, got %v" , exp, got) } rlock.Unlock() // manually unlock lock.Unlock() // lock again lock, ok = mx.Lock() if exp, got := true , ok; exp != got { t.Fatalf( "exp %v, got %v" , exp, got) } defer lock.Unlock() } func TestRWMutex_LockIntent(t *testing.T) { t.Parallel() const lockKey = "lock_intent_mutex" rc := redis.NewClient(redisOpts()) prep(t, rc, lockKey) mx := NewMutex(rc, lockKey, WithLockIntent()) lock, ok := mx.RLock() if exp, got := true , ok; exp != got { t.Fatalf( "exp %v, got %v" , exp, got) } defer lock.Unlock() // mark lock intent _, _, err := mx.lock(newLockOptions(mx.opts)) if exp, got := ErrLock, err; !errors.Is(got, exp) { t.Fatalf( "exp %v, got %v" , exp, got) } // try rlock _, ok = mx.TryRLock() if exp, got := false , ok; exp != got { t.Fatalf( "exp %v, got %v" , exp, got) } // manually unlock lock.Unlock() // lock write lock, ok = mx.Lock() if exp, got := true , ok; exp != got { t.Fatalf( "exp %v, got %v" , exp, got) } lock.Unlock() // remove lock intent // lock again lock, ok = mx.RLock() if exp, got := true , ok; exp != got { t.Fatalf( "exp %v, got %v" , exp, got) } defer lock.Unlock() } func TestRWMutex_ID(t *testing.T) { t.Parallel() rw := &RWMutex{} rw.id.buf = make ([] byte , lenBytesID) id, _ := rw.randomID() if exp, got := 32 , len (id); exp != got { t.Fatalf( "exp %v, got %v" , exp, got) } } func prep(t *testing.T, rc *redis.Client, key string ) { t.Cleanup( func () { for _, v := range [] string {key, lockIntentKey(key)} { if err := rc.Del(context.Background(), v).Err(); err != nil { t.Fatal(err) } } if err := rc. Close (); err != nil { t.Fatal(err) } }) } func assertTTL(t *testing.T, rc *redis.Client, key string , exp time.Duration) { t.Helper() got, err := rc.TTL(context.Background(), key).Result() if exp, got := (any)( nil ), err; exp != got { t.Fatalf( "exp %v, got %v" , exp, got) } delta := got - exp if delta time.Second { t.Fatalf( "exp ~%v, got %v" , exp, got) } } func redisOpts() *redis.Options { return &redis.Options{ Network: "tcp" , Addr: "0.0.0.0:6379" , DB: 9 , } } |
以上就是利用Redis lua实现高效读写锁的代码实例的详细内容,更多关于Redis lua读写锁的资料请关注IT俱乐部其它相关文章!