写在前面
随着分布式系统的快速发展和广泛应用,如何处理共享资源的互斥访问场景呢?即分布式锁,常见采用有开源的 MySQL,Redis,ZooKeeper,Etcd 等三方组件来实现。今天主要介绍Golang基于Redis实现分布式互斥锁。
分布式锁特性
那么分布式锁需要处理哪些问题场景呢,或者说特性有哪些? 这是redis.io的原文:
安全和活力保证:
- 安全特性:互斥。在任何给定时刻,只有一个客户端可以持有一把锁。
- 活性属性 A:无死锁。最终总是有可能获得锁,即使锁定资源的客户端崩溃或分区。
- 活性属性 B:容错性。只要大多数 Redis 节点正常运行,客户端就可以获取和释放锁
结合实际场景,翻译一下:
- 互斥性。锁的基础特性,只能被第一个持有者持有;
- 防死锁。在持有方发生异常时,设置锁的超时释放机制,规避死锁风险
- 高可用、高性能。
go-redsyn
Redsync 为 Go 提供了基于 Redis 的分布式互斥锁实现 安装:
$ go get github.com/go-redsync/redsync/v4
例子:
package main
import (
goredislib "github.com/go-redis/redis/v8"
"github.com/go-redsync/redsync/v4"
"github.com/go-redsync/redsync/v4/redis/goredis/v8"
)
func main() {
//使用go-redis(或redigo)创建一个池,该池是redisync will
//在与Redis通信时使用这也可以是任何一个池子
//实现' redis. properties '池的接口。
client := goredislib.NewClient(&goredislib.Options{
Addr: "localhost:6379",
})
//redis集群
//client := goredislib.NewClusterClient(&goredislib.ClusterOptions{ //Addr: []string{"localhost:6379"}, })
pool := goredis.NewPool(client) // or, pool := redigo.NewPool(...)
//创建redisync实例,用于获得互斥锁。
rs := redsync.New(pool)
//为所有需要互斥对象的实例使用相同的名称来获取一个新的互斥对象
//相同的锁。
mutexname := "my-global-mutex"
mutex := rs.NewMutex(mutexname)
//获取互斥锁在这个成功之后,没有其他人
//可以获得相同的锁(相同的互斥锁名),直到我们解锁它。
if err := mutex.Lock(); err != nil {
panic(err)
}
//做你需要锁的工作。
//释放锁,以便其他进程或线程可以获得锁。
if ok, err := mutex.Unlock(); !ok || err != nil {
panic("unlock failed")
}
}
实现原理
加锁
在New一个互斥锁mutex后,通过mutex.Lock()加锁,查看源码实现:
func (m *Mutex) acquire(ctx context.Context, pool redis.Pool, value string) (bool, error) {
conn, err := pool.Get(ctx)
if err != nil {
return false, err
}
defer conn.Close()
reply, err := conn.SetNX(m.name, value, m.expiry)
if err != nil {
return false, err
}
return reply, nil
}
可见,是通过Redis命令Setnx操作来实现。
Redis Setnx(SET if Not eXists) 命令在指定的 key 不存在时,为 key 设置指定的值。设置成功,返回 1 。 设置失败,返回 0 。相对于Set,Setnx具有原子性。
释放锁
func (m *Mutex) release(ctx context.Context, pool redis.Pool, value string) (bool, error) {
conn, err := pool.Get(ctx)
if err != nil {
return false, err
}
defer conn.Close()
status, err := conn.Eval(deleteScript, m.name, value)
if err != nil {
return false, err
}
return status != int64(0), nil
}
var deleteScript = redis.NewScript(1, `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
`)
可见,Redis使用Lua脚本命令来Delete Key操作。
这里需要关注到Del的时候,需要传入value,value是在加锁时设置的随机唯一值。在删除前进行校验value是否一致,防止误删。
锁超时-防死锁
如果持有锁的系统宕机、异常了导致没法release锁,那么其他方就无法活动该锁,导致死锁。
// NewMutex returns a new distributed mutex with given name.
func (r *Redsync) NewMutex(name string, options ...Option) *Mutex {
m := &Mutex{
name: name,
expiry: 8 * time.Second,
tries: 32,
delayFunc: func(tries int) time.Duration {
return time.Duration(rand.Intn(maxRetryDelayMilliSec-minRetryDelayMilliSec)+minRetryDelayMilliSec) * time.Millisecond
},
genValueFunc: genValue,
driftFactor: 0.01,
timeoutFactor: 0.05,
quorum: len(r.pools)/2 + 1,
pools: r.pools,
}
for _, o := range options {
o.Apply(m)
}
return m
}
可见,Mutex.expiy 设置了超时时间,默认为8秒,可以通过Option修改超时时间
// WithExpiry can be used to set the expiry of a mutex to the given value.
func WithExpiry(expiry time.Duration) Option {
return OptionFunc(func(m *Mutex) {
m.expiry = expiry
})
}
Example(设置超时时间、重复加锁):
//设置锁超时3秒
mutex := rs.NewMutex(mutexname,redsync.WithExpiry(time.Second*3)
if err := mutex.Lock(); err != nil {
panic(err)
}
//重复加锁
if err := mutex.Lock(); err != nil {
panic(err)
}
if err := mutex.Lock(); err != nil {
panic(err)
}
OutPut:
panic: lock already taken, locked nodes: [0]
Example(设置超时时间、超时释放锁):
//设置锁超时10秒
mutex := rs.NewMutex(mutexname,redsync.WithExpiry(time.Second*10)
if err := mutex.Lock(); err != nil {
panic(err)
}
//11秒后锁超时,释放
time.Sleep(11 * time.Second)
//重复加锁但已释放、无异常
if err := mutex.Lock(); err != nil {
panic(err)
}
if err := mutex.Lock(); err != nil {
panic(err)
}
OutPut:
总结
本文主要介绍分布式锁的基础概念、特性互斥性、防死锁、高可用高性能。以及golang基于redis,使用go-redsync库实现分布式锁。通过例子简单介绍go-redsync的入门使用,以及通过源码阅读来刨析简介实现原理: 1.利用Redis Setnx原子性实现互斥性 ,2.利用Redis执行lua脚本、检查value值避免误删操作, 3.通过设置ExpiryOption设置超时时间,实现锁超时、避免死锁。