Redis分布式锁

前言

分布式锁是控制分布式系统或不同系统之间共同访问共享资源的一种锁实现,如果不同的系统或同一个系统的不同主机之间共享了某个资源时,往往需要互斥来防止彼此干扰保证一致性。目前微服务盛行,分布式锁几乎成了每家公司的标配,使用redis来实现分布式锁是比较普遍的方式

分布式锁特征

先看一个redis分布式锁的代码片段

// redis key是字段串前缀+主键id
// 例如:update_account_"account_id"
private <T> T lockAndThrowException(RedisLock redisLock, Supplier<T, EntityManager> supplier) throws RequiredLockFailException {
    // 获取锁
    if (redisLock.tryLock()) {
      try {
        // 执行锁住的方法
        return supplier.get(em);
      } finally {
        // 发送方法执行完成事件,会根据此事件解锁
        publisher.publishEvent(new LockOperationFinishEvent(redisLock));
      }
    }
    // 获取锁失败则抛出异常
    throw new RequiredLockFailException();
}


public boolean tryLock() {
    // 获取锁,设置锁过期时间
    if (this.setNX(lockKey, getLockExpiredTime())) {
      locked = true;
      return false;
    }
    return false;
}


private boolean setNX(final String key, final String value) {
    return redisTemplate.opsForValue().setIfAbsent(key, value);
}
复制代码

image.png

这个分布式锁有什么问题呢?

  1. 互斥性:同一时刻锁只能被一个线程持有,如下图所示

    • 客户端1获取锁,设置锁超时时间,却没有在超时时间内执行完业务
    • 此时客户端2获取到锁,在客户端2执行自己的业务代码时,客户端1执行完业务代码,误删客户端2设置的锁
  2. 可重入性:一个线程在持有锁的情况可以对其再次请求加锁,防止锁在线程执行完临界区操作之前释放。很明显,上面的代码并不支持可重入性

  3. 超时释放:某一个实例获取锁成功,在执行业务代码的时候实例挂掉了,锁一直存在在redis中,没有释放

上面3点是这个redis分布式锁存在的问题,同时也是分布式锁需要具备的特征,当然,redis分布式锁的应用场景一般是高并发,所以优秀的分布式锁应该也具备高性能和高可用的特征

Redisson

Redisson是目前广泛使用的分布式锁框架,良好地解决了以上的问题,先来看一段加锁的代码

public boolean lock(String lockName, long expireSeconds) {
    RLock rLock = redissonManager.getRedisson().getLock(lockName);
    boolean getLock = false;
    try {
      getLock = rLock.tryLock(0, expireSeconds, TimeUnit.SECONDS);
      if (getLock) {
        LOGGER.info("获取Redisson分布式锁[成功],lockName={}", lockName);
      } else {
        LOGGER.info("获取Redisson分布式锁[失败],lockName={}", lockName);
      }
    } catch (InterruptedException e) {
      LOGGER.error("获取Redisson分布式锁[异常],lockName=" + lockName, e);
      e.printStackTrace();
      return false;
    }
    return getLock;
}
复制代码
加锁源码
// 默认的加锁时间
private long lockWatchdogTimeout = 30 * 1000;
复制代码
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
        // 加锁方有设置加锁时间的话,就不会有下面的看门狗机制
  	if (leaseTime != -1) {
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
  	// 加锁方没有设置加锁时间的话,会先加锁
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
                        // 异步设置线程,定时续命
  			ttlRemainingFuture.addListener(new FutureListener<Long>() {
            @Override
            public void operationComplete(Future<Long> future) throws Exception {
                if (!future.isSuccess()) {
                    return;
                }

                Long ttlRemaining = future.getNow();
                // lock acquired
                if (ttlRemaining == null) {
                    scheduleExpirationRenewal(threadId);
                }
            }
        });
        return ttlRemainingFuture;
}
复制代码
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);
        // 具体加锁的代码是一串lua脚本
        // lua脚本有三个判断
        // 1.之前未加过锁,加锁,设置过期时间,注意:加锁的命令是hset
        // 2.之前已经加过锁的,在之前加锁的基础上,value + 1,再刷新一下超时时间
        // 3.非本线程的锁,获取到该锁的过期时间
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  "if (redis.call('exists', KEYS[1]) == 0) then " +                        
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
复制代码
private void scheduleExpirationRenewal(final long threadId) {
        if (expirationRenewalMap.containsKey(getEntryName())) {
            return;
        }

        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                // 续命的代码也是一串lua脚本
                RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return 1; " +
                        "end; " +
                        "return 0;",
                          Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
                
                future.addListener(new FutureListener<Boolean>() {
                    @Override
                    public void operationComplete(Future<Boolean> future) throws Exception {
                        expirationRenewalMap.remove(getEntryName());
                        if (!future.isSuccess()) {
                            log.error("Can't update lock " + getName() + " expiration", future.cause());
                            return;
                        }
                        
                        if (future.getNow()) {
                            // reschedule itself
                            scheduleExpirationRenewal(threadId);
                        }
                    }
                });
            }
        // 这里是间隔时间,每隔10秒续一次命
        // internalLockLeaseTime取的是上面的加锁时间,30s  
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

        if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
            task.cancel();
        }
}
复制代码

这个是加完锁之后在redis中的值,数据类型是hash,field是uuid+threadId,value是锁的重入数

image.png

加完锁后不要释放,可以清楚地看到每隔10s续一次命,又变回30s

image.png

解锁源码
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        // 解锁同样是一段lua代码
        // 有几个判断
        // 1.key不存在,返回1,发送释放锁消息
        // 2.key存在,field不存在,说明自己不是锁持有者,返回nil
        // 3.key存在,field存在,并且value-1之后还是大于0,刷新过期时间,返回0
        // 4.key存在,field存在,并且value-1之后小于等于0,删除key,发送释放锁消息
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; " +
                "end;" +
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                "end; " +
                "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                "else " +
                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; "+
                "end; " +
                "return nil;",
                Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

}
复制代码

Redlock

虽然Redisson解决了可重入、互斥、超时释放的问题,个人认为已经很完美了,但是还是存在一个问题

描述这个问题之前需要一个前置知识,redis主从节点之间的数据同步是异步的

假如程序刚在redis主节点加了一把锁,这把锁还没有同步到redis从节点,redis主节点就挂掉了,那么这把锁就相当于没加了,多个线程可以获取到修改资源的权利,违反了锁的互斥性

Redis 的作者 antirez 提供了 RedLock 的算法来解决这个问题:详细可参考www.infoq.cn/article/dva…

个人认为这个算法太重了,且这个场景较为边缘,如果不是必要,不建议使用这种方式

参考资料

www.infoq.cn/article/dva…

github.com/TaXueWWL/re…

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享