本文正在参加「Python主题月」,详情查看 活动链接
加锁操作非原子性
import threading, time, redis
from redis import StrictRedis
# 方式一:加锁操作非原子性
# 失败案例
class RedisLock(object):
def __init__(self, redis_conn):
self.redis_conn = redis_conn
def get_lock_key(self, key):
lock_key = 'lock_%s' % key
return lock_key
def get_lock(self, key):
"""
:param key: 分布式锁key
:return: True 表示获取到了锁
get_lock会循环获取锁,只有得到锁的人才能退出循环
获取锁的原理:所有需要进行a操作的线程都要先获取redis中一个规定的key,若一个线程获取key为空,则获取成功,并设置key=1,进行a操作,其他线程都会获取失败以至再次获取到key为空
获取到key的线程进行操作a结束后,需要释放锁,就要删除key=1,以便下一个线程获取锁
但是本方式存在漏洞:第一个线程获取到key之后,还没来得及放入key=1,就被第二个线程获取到了
"""
lock_key = self.get_lock_key(key)
while True:
value = self.redis_conn.get(lock_key)
if not value:
self.redis_conn.set(lock_key, 1)
return True
time.sleep(0.01)
def del_lock(self, key):
lock_key = self.get_lock_key(key)
return self.redis_conn.delete(lock_key)
def increase_data(redis_conn, lock, key):
lock_value = lock.get_lock(key) # 获取锁
value = redis_conn.get(key) # 获取数据
time.sleep(0.1)
if value:
value = int(value) + 1
else:
value = 0
redis_conn.set(key, value)
thread_name = threading.current_thread().name
print(thread_name, value)
lock.del_lock(key) # 释放锁
##主程序
if __name__ == "__main__":
pool = redis.ConnectionPool(host='127.0.0.1', port=6379, db=8)
redis = StrictRedis(connection_pool=pool)
lock = RedisLock(redis)
key = 'test_key'
thread_count = 10
redis.delete(key)
for i in range(thread_count):
# 启动10个线程调用increase_data
# 传入的lock相当于是一个锁工具,所有人都可以通过锁工具去redis中获取锁,但是锁的数量唯一
thread = threading.Thread(target=increase_data, args=(redis, lock, key))
thread.start()
"""
实际输出结果
Thread-1 0
Thread-8 1
Thread-6 Thread-7 1
1
Thread-9 1
Thread-2Thread-4 2
2Thread-3 2
Thread-10 2
Thread-5 2
"""
复制代码
使用setnx原子性加锁
import threading, time, redis
from redis import StrictRedis
class RedisLock(object):
def __init__(self, redis_conn):
self.redis_conn = redis_conn
def get_lock_key(self, key):
lock_key = 'lock_%s' % key
return lock_key
def get_lock(self, key):
"""
:param key:
:return:
鉴于上面版本是由于命令不是原子性操作造成两个或多个线程同时获得锁的问题,这个版本改成使用 redis 的 setnx 命令来进行锁的查询和设置操作
setnx 即 set if not exists,顾名思义就是当key不存在的时候才设置 value,并返回 1,如果 key 已经存在,则不进行任何操作,返回 0。
和之前get+set的区别就是 setnx只需要一步操作即可完成获取锁和设置锁的两步操作
这种原子性是成功的关键
操作是成功了,但是还是有问题,若某个线程在a操作时抛出异常,那么其他所有等待获取锁的线程将陷入死循环,造成死锁
"""
lock_key = self.get_lock_key(key)
while True:
value = self.redis_conn.setnx(lock_key, 1)
if value:
return True
time.sleep(0.01)
def del_lock(self, key):
lock_key = self.get_lock_key(key)
return self.redis_conn.delete(lock_key)
def increase_data(redis_conn, lock, key):
lock_value = lock.get_lock(key) # 获取锁
value = redis_conn.get(key) # 获取数据
time.sleep(0.1)
if value:
value = int(value) + 1
else:
value = 0
redis_conn.set(key, value)
thread_name = threading.current_thread().name
print(thread_name, value)
# 模拟死锁
# if thread_name == "Thread-2":
# print("thread-2 crash ....")
# import sys
# sys.exit(1)
lock.del_lock(key) # 释放锁
##主程序
if __name__ == "__main__":
pool = redis.ConnectionPool(host='127.0.0.1', port=6379, db=8)
redis = StrictRedis(connection_pool=pool)
lock = RedisLock(redis)
key = 'test_key'
thread_count = 10
redis.delete(key)
for i in range(thread_count):
thread = threading.Thread(target=increase_data, args=(redis, lock, key))
thread.start()
复制代码
解决死锁
import threading, time, redis
from redis import StrictRedis
class RedisLock(object):
def __init__(self, redis_conn):
self.redis_conn = redis_conn
def get_lock_key(self, key):
lock_key = 'lock_%s' % key
return lock_key
def get_lock(self, key, timeout=1):
"""
:param key:
:param timeout:
:return:
根据redis的ttl机制
扩展资料:redis的是两种失效机制
1 被动失效:当客户端主动获取时,判断是否失效(超时)
2 主动失效:redis中有一个定时任务,每秒执行10次,随机从设定有失效机制的key中获取20个,删除过期的key,判断下是否超过1/4的key已经失效了,如果没有执行步骤第一步再次获取20个。
在锁之前先设定超时实现,避免死锁现象
解决了死锁现象但是锁机制还是存在问题:线程1退出死锁后,线程2立刻获取到了锁,但是线程1还有一步删除锁的操作,就会吧线程2刚放进去的锁删掉,以至于数据混乱
这里很容易想到flask源码中关于上下文request ctx对象唯一性的处理,通过线程id作为标识,也可以加上进程pid,机器ip,或者时间戳,就可以绝对确立唯一性了
然后在删除锁的操作中设置设置锁和删除锁必须在同一个线程中执行
"""
lock_key = self.get_lock_key(key)
while True:
value = self.redis_conn.set(lock_key, 1, nx=True, ex=timeout)
if value:
break
time.sleep(0.1)
def del_lock(self, key):
lock_key = self.get_lock_key(key)
return self.redis_conn.delete(lock_key)
def increase_data(redis_conn, lock, key):
lock_value = lock.get_lock(key) # 获取锁
value = redis_conn.get(key) # 获取数据
time.sleep(2.5) # 模拟实际情况下进行的某些耗时操作, 且执行时间大于锁过期的时间
time.sleep(0.1)
if value:
value = int(value) + 1
else:
value = 0
redis_conn.set(key, value)
thread_name = threading.current_thread().name
print(thread_name, value)
# 模拟死锁
if thread_name == "Thread-2":
print("thread-2 crash ....")
import sys
sys.exit(1)
lock.del_lock(key) # 释放锁
##主程序
if __name__ == "__main__":
pool = redis.ConnectionPool(host='127.0.0.1', port=6379, db=8)
redis = StrictRedis(connection_pool=pool)
lock = RedisLock(redis)
key = 'test_key'
thread_count = 10
redis.delete(key)
for i in range(thread_count):
thread = threading.Thread(target=increase_data, args=(redis, lock, key))
thread.start()
复制代码
通过线程唯一性解决误删锁
import os
import socket
import threading, time, redis
from redis import StrictRedis
class RedisLock(object):
def __init__(self, redis_conn):
self.redis_conn = redis_conn
# 获取机器ip
self.ip = socket.gethostbyname(socket.gethostname())
# 获取进程pid
self.pid = os.getpid()
def get_lock_key(self, key):
lock_key = 'lock_%s' % key
return lock_key
def gen_unique_value(self):
thread_name = threading.current_thread().name
time_now = time.time()
# 机器ip 线程pid 进程名 时间戳, 确立唯一性
unique_value = "{0}-{1}-{2}-{3}".format(self.ip, self.pid, thread_name, time_now)
return unique_value
def get_lock(self, key, timeout=4):
"""
:param key:
:param timeout:
:return:
获取锁后会返回唯一标识(gen_unique_value),删除锁的时候通过判断唯一标识是否相等进行删除
这里执行并没有成功,甚至相比之前误差更大
原因是我们虽然解决了a线程只能删a线程放入的锁,但是当a线程判断通过后,正要进行删除key操作的时候,a线程的key过期了,此时b线程抢到了锁,并写入,然后a线程就删除了b线程写入的锁
简单来说就是 删除操作并非原子性,中途有缝可以插 虽然这种情况可以通过给锁设置合理的超时时间来避免,比如设置一个超时时间大于线程执行时间的锁
以上的方案中,我们是假设 redis 服务端是单集群且高可用的,忽视了以下的问题:
如果某一时刻 redis master 节点发生了故障,集群中的某个 slave 节点变成 master 节点,
这时候就可能出现原 master 节点上的锁没有及时同步到 slave 节点,导致其他线程同时获得锁。
对于这个问题,可以参考 redis 官方推出的 redlock 算法,但是比较遗憾的是,该算法也没有很好地解决锁过期的问题。
"""
lock_key = self.get_lock_key(key)
unique_value = self.gen_unique_value()
print("unique value %s" % unique_value)
while True:
value = self.redis_conn.set(lock_key, 1, nx=True, ex=timeout)
if value:
return unique_value
else:
thread_name = threading.current_thread().name
time.sleep(0.1)
def del_lock(self, key, value):
"""
:param key: 锁
:param value: 进程信息
:return:
"""
lock_key = self.get_lock_key(key)
old_lock_value = self.redis_conn.get(lock_key)
if old_lock_value == value:
return self.redis_conn.delete(lock_key)
def increase_data(redis_conn, lock, key):
lock_value = lock.get_lock(key) #获取锁
value = redis_conn.get(key) #获取数据
time.sleep(2.5) #模拟实际情况下进行的某些耗时操作, 且执行时间大于锁过期的时间
if value:
value = int(value) + 1
else:
value = 0
redis_conn.set(key, value)
thread_name = threading.current_thread().name
print(thread_name, value)
if thread_name == "Thread-2":
print("thread-2 crash ....")
import sys
sys.exit(1)
lock.del_lock(key, lock_value) #释放锁
##主程序
if __name__ == "__main__":
pool = redis.ConnectionPool(host='127.0.0.1', port=6379, db=8)
redis = StrictRedis(connection_pool=pool)
lock = RedisLock(redis)
key = 'test_key'
thread_count = 10
redis.delete(key)
for i in range(thread_count):
thread = threading.Thread(target=increase_data, args=(redis, lock, key))
thread.start()
"""
unique value 10.0.0.13-52504-Thread-1-1621907713.510258
unique value 10.0.0.13-52504-Thread-2-1621907713.510459
unique value 10.0.0.13-52504-Thread-3-1621907713.510732
unique value 10.0.0.13-52504-Thread-4-1621907713.511075
unique value 10.0.0.13-52504-Thread-5-1621907713.511213
unique value 10.0.0.13-52504-Thread-6-1621907713.511355
unique value 10.0.0.13-52504-Thread-7-1621907713.511531
unique value 10.0.0.13-52504-Thread-8-1621907713.5118861
unique value 10.0.0.13-52504-Thread-9-1621907713.512355
unique value 10.0.0.13-52504-Thread-10-1621907713.512751
Thread-1 0
Thread-6 0
Thread-3 0
Thread-4 1
Thread-9 1
Thread-7 1
Thread-8 2
Thread-10 2
Thread-2 2
thread-2 crash ....
Thread-5 3
"""
复制代码
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END