etcd distributed lock

Etcd有以下机制,用他们来实现分布式锁

  • Lease 机制:即租约机制(TTL,Time To Live),Etcd 可以为存储的 Key-Value 对设置租约,当租约到期,Key-Value 将失效删除;同时也支持续约续期(KeepAlive)
  • Revision 机制:每个 key 带有一个 Revision 属性值,Etcd 每进行一次事务对应的全局 Revision 值都会加一,因此每个 Key 对应的 Revision 属性值都是全局唯一的。通过比较 Revision 的大小就可以知道进行写操作的顺序。在实现分布式锁时,多个程序同时抢锁,根据 Revision 值大小依次获得锁,可以避免惊群效应,实现公平锁
  • Prefix 机制:即前缀机制(或目录机制)。可以根据前缀(目录)获取该目录下所有的 Key 及对应的属性(包括 Key、Value 以及 Revision 等)
  • Watch 机制:即监听机制,Watch 机制支持 Watch 某个固定的 Key,也支持 Watch 一个目录前缀(前缀机制),当被 Watch 的 Key 或目录发生变化,客户端将收到通知

Etcd分布式锁的步骤

  • 假设分布式锁的 Name 为 /root/lockname,用来控制某个共享资源,concurrency 会自动将其转换为目录形式:/root/lockname/
  • 客户端 A 连接 Etcd,创建一个租约 Leaseid_A,并设置 TTL(以业务逻辑来定 TTL 的时间), 以 /root/lockname 为前缀创建全局唯一的 Key,该 Key 的组织形式为 /root/lockname/{leaseid_A},客户端 A 将此 Key 绑定租约写入 Etcd,同时调用 TXN 事务查询写入的情况和具有相同前缀 /root/lockname/ 的 Revision 的排序情况
  • 客户端 A 判断自己是否获得锁,以前缀 /root/lockname/ 读取 keyValue 列表(keyValue 中带有 Key 对应的 Revision),判断自己 Key 的 Revision 是否为当前列表中最小的,如果是则认为获得锁;否则阻塞监听列表中前一个 Revision 比自己小的 Key 的删除事件,一旦监听到删除事件或者因租约失效而删除的事件,则自己获得锁
  • 执行业务逻辑,操作共享资源
  • 释放分布式锁,现网的程序逻辑需要实现在正常和异常条件下的释放锁的策略,如捕获 SIGTERM 后执行 Unlock,或者异常退出时,有完善的监控和及时删除 Etcd 中的 Key 的异步机制,避免出现死锁现象
  • 当客户端持有锁期间,其它客户端只能等待,为了避免等待期间租约失效,客户端需创建一个定时任务进行续约续期。如果持有锁期间客户端崩溃,心跳停止,Key 将因租约到期而被删除,从而锁释放,避免死锁

拷贝过来的网图

image.png

简单实现

结构

.
├── README.md
├── elock
│   ├── errors.go
│   └── lock.go
├── go.mod
├── go.sum
└── main.go
复制代码

自定义Error

errors.go

package elock

type Err string

func (e Err) Error() string {
	return string(e)
}

const (
	ErrAcquire Err = "fail to acquire"
	ErrRelease Err = "fail to release"
)
复制代码

elock逻辑

elock

package elock

import (
	"context"
	"errors"
	"fmt"
	"log"

	"go.etcd.io/etcd/api/v3/mvccpb"
	v3 "go.etcd.io/etcd/client/v3"
)

type Mutex struct {
	client  *v3.Client
	pfx     string
	myKey   string
	myRev   int64
	ttl     int64
	leaseId v3.LeaseID
}

func NewMutex(client *v3.Client, pfx string, ttl int64) *Mutex {
	return &Mutex{
		client: client,
		pfx:    pfx + "/",
		myKey:  "",
		myRev:  -1,
		ttl:    ttl,
	}
}

func (m *Mutex) grantLease(ctx context.Context) error {
	if m.leaseId == v3.NoLease {
		resp, err := m.client.Grant(ctx, m.ttl)
		if err != nil {
			return nil
		}
		m.leaseId = resp.ID
		return nil
	}

	resp, err := m.client.TimeToLive(ctx, m.leaseId)
	if err != nil {
		return nil
	}

	// if the lease is no longer alive, we should grant a
	// new lease
	if resp.TTL <= 0 {
		resp, err := m.client.Grant(ctx, m.ttl)
		if err != nil {
			return nil
		}
		m.leaseId = resp.ID

	}
	return nil
}

func (m *Mutex) AcquireOnce(ctx context.Context) error {
	m.grantLease(ctx)
	// m.myKey eg : /etcdlock/3f357a41d952f12c
	m.myKey = fmt.Sprintf("%s%x", m.pfx, m.leaseId)

	// 比较当前m.myKey的CreateRevision是否为0,0代表目前不存在该key,执行put操作
	// 非0表示key已经被创建,需要执行get操作
	cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
	// put the kv to etcd, and attach the kv with the lease
	put := v3.OpPut(m.myKey, "", v3.WithLease(m.leaseId))
	// 获取key是否已设置成锁
	get := v3.OpGet(m.myKey)
	// 获取当前锁真正的持有者
	getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
	// cmp条件成立,则执行then,否则执行else
	txnResp, err := m.client.
		Txn(ctx).
		If(cmp).
		Then(put /*resp.Responses[0]*/, getOwner /*resp.Responses[1]*/).
		Else(get, getOwner).
		Commit()

	if err != nil {
		return err
	}
	m.myRev = txnResp.Header.Revision
	// the compare failed that mean the key is alraedy exised in etcd,So get
	// the create revision from the get result
	if txnResp.Succeeded != true {
		m.myRev = txnResp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
	}
	// if these is no owner or owner's revision equal current revision that means
	// get lock success
	ownerKey := txnResp.Responses[1].GetResponseRange().Kvs
	log.Printf("key: %s id: %d acquire once", m.myKey, m.myRev)
	if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
		return nil
	}
	return ErrAcquire

}

func (m *Mutex) Acquire(ctx context.Context) error {
	err := m.AcquireOnce(ctx)
	if err != nil && !errors.Is(err, ErrAcquire) {
		return err
	}
	err = m.waitRelease(ctx, m.client, m.pfx, m.myRev-1)

	if err != nil {
		return err
	}
	log.Printf("key: %s id: %d lock  success", m.myKey, m.myRev)
	return nil
}

func (m *Mutex) waitRelease(ctx context.Context, client *v3.Client, prefix string, rev int64) error {
	getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(rev))
	for {
		resp, err := m.client.Get(ctx, m.pfx, v3.WithLastCreate()...)
		resp, err = m.client.Get(ctx, m.pfx, getOpts...)
		if err != nil {
			return err
		}
		if len(resp.Kvs) == 0 {
			return nil
		}
		lastKey := string(resp.Kvs[0].Key)
		// lastKey is the key's rev less than current rev, resp.Header.Revision means in the newest snapshot to query
		if err = waitDelete(ctx, m.client, lastKey, resp.Header.Revision); err != nil {
			return err
		}
	}
}

func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error {
	cctx, cancel := context.WithCancel(ctx)
	defer cancel()

	var wr v3.WatchResponse
	wch := client.Watch(cctx, key, v3.WithRev(rev))
	for wr = range wch {
		for _, ev := range wr.Events {
			if ev.Type == mvccpb.DELETE {
				log.Printf("wait key: %s on rev: %d deleted", key, rev)
				return nil
			}
		}
	}
	if err := wr.Err(); err != nil {
		return err
	}
	if err := ctx.Err(); err != nil {
		return err
	}
	return fmt.Errorf("lost watcher waiting for delete")
}

func (m *Mutex) Release(ctx context.Context) error {
	_, err := m.client.Revoke(ctx, m.leaseId)
	if err != nil {
		return err
	}
	_, err = m.client.Delete(ctx, m.myKey)
	if err != nil {
		return err
	}
	log.Printf("%s unlock success\n", m.myKey)
	return nil
}
复制代码

使用

main.go

package main

import (
	"context"
	"example/elock"
	"fmt"
	"log"
	"sync"
	"time"

	v3 "go.etcd.io/etcd/client/v3"
)

func main() {
	var wg sync.WaitGroup
	client, err := v3.New(v3.Config{Endpoints: []string{"127.0.0.1:2379"}})
	if err != nil {
		log.Fatal(err)
	}

	for i := 0; i < 3; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			lock := elock.NewMutex(client, "/etcdlock", 10)
			err := lock.Acquire(context.Background())
			if err != nil {
				fmt.Println(err)
				return
			}
			time.Sleep(3 * time.Second)
			err = lock.Release(context.Background())
			if err != nil {
				fmt.Println(err)
				return
			}

		}()
	}
	wg.Wait()
}
复制代码

output

~/workspace/distribute-lock/etcd  master !2  go run main.go 
2021/06/26 13:20:43 key: /etcdlock/3f357a41d952f22c id: 137 acquire once
2021/06/26 13:20:43 key: /etcdlock/3f357a41d952f22a id: 138 acquire once
2021/06/26 13:20:43 key: /etcdlock/3f357a41d952f22e id: 139 acquire once
2021/06/26 13:20:43 key: /etcdlock/3f357a41d952f22c id: 137 lock  success
2021/06/26 13:20:46 wait key: /etcdlock/3f357a41d952f22c on rev: 139 deleted
2021/06/26 13:20:46 /etcdlock/3f357a41d952f22c unlock success
2021/06/26 13:20:46 key: /etcdlock/3f357a41d952f22a id: 138 lock  success
2021/06/26 13:20:49 wait key: /etcdlock/3f357a41d952f22a on rev: 139 deleted
2021/06/26 13:20:49 /etcdlock/3f357a41d952f22a unlock success
2021/06/26 13:20:49 key: /etcdlock/3f357a41d952f22e id: 139 lock  success
2021/06/26 13:20:52 /etcdlock/3f357a41d952f22e unlock success
复制代码
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享