Etcd有以下机制,用他们来实现分布式锁
- Lease 机制:即租约机制(TTL,Time To Live),Etcd 可以为存储的 Key-Value 对设置租约,当租约到期,Key-Value 将失效删除;同时也支持续约续期(KeepAlive)
- Revision 机制:每个 key 带有一个 Revision 属性值,Etcd 每进行一次事务对应的全局 Revision 值都会加一,因此每个 Key 对应的 Revision 属性值都是全局唯一的。通过比较 Revision 的大小就可以知道进行写操作的顺序。在实现分布式锁时,多个程序同时抢锁,根据 Revision 值大小依次获得锁,可以避免惊群效应,实现公平锁
- Prefix 机制:即前缀机制(或目录机制)。可以根据前缀(目录)获取该目录下所有的 Key 及对应的属性(包括 Key、Value 以及 Revision 等)
- Watch 机制:即监听机制,Watch 机制支持 Watch 某个固定的 Key,也支持 Watch 一个目录前缀(前缀机制),当被 Watch 的 Key 或目录发生变化,客户端将收到通知
Etcd分布式锁的步骤
- 假设分布式锁的 Name 为 /root/lockname,用来控制某个共享资源,concurrency 会自动将其转换为目录形式:/root/lockname/
- 客户端 A 连接 Etcd,创建一个租约 Leaseid_A,并设置 TTL(以业务逻辑来定 TTL 的时间), 以 /root/lockname 为前缀创建全局唯一的 Key,该 Key 的组织形式为 /root/lockname/{leaseid_A},客户端 A 将此 Key 绑定租约写入 Etcd,同时调用 TXN 事务查询写入的情况和具有相同前缀 /root/lockname/ 的 Revision 的排序情况
- 客户端 A 判断自己是否获得锁,以前缀 /root/lockname/ 读取 keyValue 列表(keyValue 中带有 Key 对应的 Revision),判断自己 Key 的 Revision 是否为当前列表中最小的,如果是则认为获得锁;否则阻塞监听列表中前一个 Revision 比自己小的 Key 的删除事件,一旦监听到删除事件或者因租约失效而删除的事件,则自己获得锁
- 执行业务逻辑,操作共享资源
- 释放分布式锁,现网的程序逻辑需要实现在正常和异常条件下的释放锁的策略,如捕获 SIGTERM 后执行 Unlock,或者异常退出时,有完善的监控和及时删除 Etcd 中的 Key 的异步机制,避免出现死锁现象
- 当客户端持有锁期间,其它客户端只能等待,为了避免等待期间租约失效,客户端需创建一个定时任务进行续约续期。如果持有锁期间客户端崩溃,心跳停止,Key 将因租约到期而被删除,从而锁释放,避免死锁
拷贝过来的网图
简单实现
结构
.
├── README.md
├── elock
│ ├── errors.go
│ └── lock.go
├── go.mod
├── go.sum
└── main.go
复制代码
自定义Error
errors.go
package elock
type Err string
func (e Err) Error() string {
return string(e)
}
const (
ErrAcquire Err = "fail to acquire"
ErrRelease Err = "fail to release"
)
复制代码
elock逻辑
elock
package elock
import (
"context"
"errors"
"fmt"
"log"
"go.etcd.io/etcd/api/v3/mvccpb"
v3 "go.etcd.io/etcd/client/v3"
)
type Mutex struct {
client *v3.Client
pfx string
myKey string
myRev int64
ttl int64
leaseId v3.LeaseID
}
func NewMutex(client *v3.Client, pfx string, ttl int64) *Mutex {
return &Mutex{
client: client,
pfx: pfx + "/",
myKey: "",
myRev: -1,
ttl: ttl,
}
}
func (m *Mutex) grantLease(ctx context.Context) error {
if m.leaseId == v3.NoLease {
resp, err := m.client.Grant(ctx, m.ttl)
if err != nil {
return nil
}
m.leaseId = resp.ID
return nil
}
resp, err := m.client.TimeToLive(ctx, m.leaseId)
if err != nil {
return nil
}
// if the lease is no longer alive, we should grant a
// new lease
if resp.TTL <= 0 {
resp, err := m.client.Grant(ctx, m.ttl)
if err != nil {
return nil
}
m.leaseId = resp.ID
}
return nil
}
func (m *Mutex) AcquireOnce(ctx context.Context) error {
m.grantLease(ctx)
// m.myKey eg : /etcdlock/3f357a41d952f12c
m.myKey = fmt.Sprintf("%s%x", m.pfx, m.leaseId)
// 比较当前m.myKey的CreateRevision是否为0,0代表目前不存在该key,执行put操作
// 非0表示key已经被创建,需要执行get操作
cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
// put the kv to etcd, and attach the kv with the lease
put := v3.OpPut(m.myKey, "", v3.WithLease(m.leaseId))
// 获取key是否已设置成锁
get := v3.OpGet(m.myKey)
// 获取当前锁真正的持有者
getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
// cmp条件成立,则执行then,否则执行else
txnResp, err := m.client.
Txn(ctx).
If(cmp).
Then(put /*resp.Responses[0]*/, getOwner /*resp.Responses[1]*/).
Else(get, getOwner).
Commit()
if err != nil {
return err
}
m.myRev = txnResp.Header.Revision
// the compare failed that mean the key is alraedy exised in etcd,So get
// the create revision from the get result
if txnResp.Succeeded != true {
m.myRev = txnResp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
}
// if these is no owner or owner's revision equal current revision that means
// get lock success
ownerKey := txnResp.Responses[1].GetResponseRange().Kvs
log.Printf("key: %s id: %d acquire once", m.myKey, m.myRev)
if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
return nil
}
return ErrAcquire
}
func (m *Mutex) Acquire(ctx context.Context) error {
err := m.AcquireOnce(ctx)
if err != nil && !errors.Is(err, ErrAcquire) {
return err
}
err = m.waitRelease(ctx, m.client, m.pfx, m.myRev-1)
if err != nil {
return err
}
log.Printf("key: %s id: %d lock success", m.myKey, m.myRev)
return nil
}
func (m *Mutex) waitRelease(ctx context.Context, client *v3.Client, prefix string, rev int64) error {
getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(rev))
for {
resp, err := m.client.Get(ctx, m.pfx, v3.WithLastCreate()...)
resp, err = m.client.Get(ctx, m.pfx, getOpts...)
if err != nil {
return err
}
if len(resp.Kvs) == 0 {
return nil
}
lastKey := string(resp.Kvs[0].Key)
// lastKey is the key's rev less than current rev, resp.Header.Revision means in the newest snapshot to query
if err = waitDelete(ctx, m.client, lastKey, resp.Header.Revision); err != nil {
return err
}
}
}
func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error {
cctx, cancel := context.WithCancel(ctx)
defer cancel()
var wr v3.WatchResponse
wch := client.Watch(cctx, key, v3.WithRev(rev))
for wr = range wch {
for _, ev := range wr.Events {
if ev.Type == mvccpb.DELETE {
log.Printf("wait key: %s on rev: %d deleted", key, rev)
return nil
}
}
}
if err := wr.Err(); err != nil {
return err
}
if err := ctx.Err(); err != nil {
return err
}
return fmt.Errorf("lost watcher waiting for delete")
}
func (m *Mutex) Release(ctx context.Context) error {
_, err := m.client.Revoke(ctx, m.leaseId)
if err != nil {
return err
}
_, err = m.client.Delete(ctx, m.myKey)
if err != nil {
return err
}
log.Printf("%s unlock success\n", m.myKey)
return nil
}
复制代码
使用
main.go
package main
import (
"context"
"example/elock"
"fmt"
"log"
"sync"
"time"
v3 "go.etcd.io/etcd/client/v3"
)
func main() {
var wg sync.WaitGroup
client, err := v3.New(v3.Config{Endpoints: []string{"127.0.0.1:2379"}})
if err != nil {
log.Fatal(err)
}
for i := 0; i < 3; i++ {
wg.Add(1)
go func() {
defer wg.Done()
lock := elock.NewMutex(client, "/etcdlock", 10)
err := lock.Acquire(context.Background())
if err != nil {
fmt.Println(err)
return
}
time.Sleep(3 * time.Second)
err = lock.Release(context.Background())
if err != nil {
fmt.Println(err)
return
}
}()
}
wg.Wait()
}
复制代码
output
~/workspace/distribute-lock/etcd master !2 go run main.go
2021/06/26 13:20:43 key: /etcdlock/3f357a41d952f22c id: 137 acquire once
2021/06/26 13:20:43 key: /etcdlock/3f357a41d952f22a id: 138 acquire once
2021/06/26 13:20:43 key: /etcdlock/3f357a41d952f22e id: 139 acquire once
2021/06/26 13:20:43 key: /etcdlock/3f357a41d952f22c id: 137 lock success
2021/06/26 13:20:46 wait key: /etcdlock/3f357a41d952f22c on rev: 139 deleted
2021/06/26 13:20:46 /etcdlock/3f357a41d952f22c unlock success
2021/06/26 13:20:46 key: /etcdlock/3f357a41d952f22a id: 138 lock success
2021/06/26 13:20:49 wait key: /etcdlock/3f357a41d952f22a on rev: 139 deleted
2021/06/26 13:20:49 /etcdlock/3f357a41d952f22a unlock success
2021/06/26 13:20:49 key: /etcdlock/3f357a41d952f22e id: 139 lock success
2021/06/26 13:20:52 /etcdlock/3f357a41d952f22e unlock success
复制代码
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END