分布式锁的实现方式有很多,本篇文章讲述一下使用Redis实现分布式锁。网上有很多使用Redis实现分布式锁的代码,但是这些代码或多或少都有问题。这篇文章会写一个实现,同时标明一些注意点。
场景
为了便于阐述,这里假设一个游戏场景,用户A有开山斧一把,价值500元宝,用户B有800元宝,想买A的开山斧,这些数据都存在Redis中。需要编写代码成功的实现该笔交易。
问题
Redis实现分布式锁,需要考虑如下问题:
- 持有锁的进程因为操作时间过长而导致锁被自动释放,但进程本身并不知晓这一点,甚至还可能会错误地释放掉了其他进程持有的锁。
- 一个持有锁并打算执行长时间操作的进程已经崩溃,但其他想要获取锁的进程不知道哪个进程持有着锁,也无法检测出持有锁的进程已经崩溃,只能白白地浪费时间等待锁被释放。
- 在一个进程持有的锁过期之后,其他多个进程同时尝试去获取锁,并且都获得了锁。
三个特性
实现一个低保障的分布式锁,需要具备三个特性
- 安全属性(Safety property): 独享(相互排斥)。在任意一个时刻,只有一个客户端持有锁。
- 活性A(Liveness property A): 无死锁。即便持有锁的客户端崩溃(crashed)或者网络被分裂(gets partitioned),锁仍然可以被获取。
- 活性B(Liveness property B): 容错。 只要大部分Redis节点都活着,客户端就可以获取和释放锁.
命令
使用Redis实现分布式锁,一般使用SETNX或者SET命令,SETNX不能同时设置过期时间,如果使用的版本大于等于2.6.12,可以使用SET命令,可以使用这个命令原子性的实现SETNX和EXPIRE的功能,下面是两个命令的简介
SETNX
命令格式:SETNX key value
时间复杂度:O(1)
说明:将key
设置值为value
,如果key
不存在,这种情况下等同SET命令。 当key
存在时,什么也不做。SETNX
是”SET if Not eXists”的简写。
返回值
-
1
如果key被设置了 -
如果key没有被设置
SET
命令格式:SET key value [EX seconds] [PX milliseconds] [NX|XX]
时间复杂度:O(1)
说明:将键key
设定为指定的“字符串”值。如果 key 已经保存了一个值,那么这个操作会直接覆盖原来的值,并且忽略原始类型。当set
命令执行成功之后,之前设置的过期时间都将失效。
选项
从2.6.12版本开始,redis为SET
命令增加了一系列选项:
-
EX
seconds – 设置键key的过期时间,单位时秒 -
PX
milliseconds – 设置键key的过期时间,单位是毫秒 -
NX
– 只有键key不存在的时候才会设置key的值 -
XX
– 只有键key存在的时候才会设置key的值
实现
此处使用SETNX实现,毕竟有的公司Redis版本可能较低,使用SETNX可以实现,SET更加没有问题。
代码如下:
<?php
function uuid($prefix = '')
{
$chars = md5(uniqid(mt_rand(), true));
$uuid = substr($chars, , 8) . '-';
$uuid .= substr($chars, 8, 4) . '-';
$uuid .= substr($chars, 12, 4) . '-';
$uuid .= substr($chars, 16, 4) . '-';
$uuid .= substr($chars, 20, 12);
$ret = $prefix . $uuid;
return strtoupper($ret);
}
function acquireLock($redis,$lockName, $acquireTime = 10, $lockTime = 10)
{
$lockKey = 'lock:' + $lockName;
$identifier = uuid('identify');
$end = time() + $acquireTime;
while (time() < $end) {
if ($redis->setnx($lockKey, $identifier)) {
$redis->expire($lockKey, $lockTime);
return $identifier;
} elseif ($redis->ttl($lockKey) == -1) {
$redis->expire($lockKey, $lockTime);
}
usleep(1000);
}
return false;
}
function process(){
$redis = new Redis();
$lockName = 'market';
//1.获取锁
$locked = acquireLock($redis,$lockName);
if($locked === false){
return false;
}
//2.进行交易
//判断A和B是否满足交易条件
//使用管道,对A和B进行操作
//3.释放锁
$releaseRes = releaseLock($redis,$lockName,$locked);
if($releaseRes === false){
return false;
}
}
function releaseLock($redis,$lockName,$identifier){
$lockKey = 'lock:' + $lockName;
$redis->watch($lockKey);
if($redis->get($lockKey) === $identifier){
$redis->multi();
$redis->del($lockKey);
$redis->exec();
return true;
}
$redis->unwatch();
return false;
}
说明:
- 获取锁:
- 创建的$identifier,这个值用于删除key的时候,判断是否为当前客户端获取的锁,以免删除其它客户端的锁
- while循环用于在一段时间内不停的获取锁
- 如果能够获取锁便获取,同时设置超时时间,防止线程运行时崩溃,锁永远无法释放
- 如果没能成功获取到锁,检查当前锁的过期时间,如果未设置过期时间,进行设置,防止其他线程获得锁后立即崩溃,没有设置过期时间
2. 处理业务
- 需要先判断交易双方是否都满足条件,因为锁定的是整个市场,所以一旦获得锁,交易双方的状态都不会再进行改变
- 使用管道能保证整个交易能像事务一样被处理,而且性能会比用redis的事务更好
3. 释放锁
- 使用watch监控锁,一旦key被变更,删除key的事务不会被执行
- 需要判断key的值是否和本线程记录的$identifier一样,只有一致才能进行删除
- 使用事务来做删除key的操作,使用事务的原因是防止中途该锁被别的线程获取
- 如果失败,记得unwatch
4. 其他问题
- 可重入问题:可重入指的是,线程可以再次获取到锁。实现方法比较简单,只需要在acquireLock的时候,传入identifier,判断当前锁的identifier和传入的是否一致,如果一致则可以进行操作
- 线程未执行完毕,锁的超时时间已过,其他线程获取到锁:解决该问题的一个方案是,当获取到锁后,在超时时间经过一半的时候检查锁是否存在或者被修改,如果没有变化且线程正常运行,则延长超时时间
思考
如果基于Redis单实例,假设这个单实例总是可用,这种方法已经足够安全。
但有两种特殊情况大家需要关注一下:
主从结构中存在明显的竞态:
- 客户端A从master获取到锁
- 在master将锁同步到slave之前,master宕掉了。
- slave节点被晋级为master节点
- 客户端B取得了同一个资源被客户端A已经获取到的另外一个锁。安全失效!
在Redis的分布式环境中,有N个Redis master
这种情况可以使用Redlock算法
总结
本文讲述了怎样用Redis实现分布式锁,并写了具体实现和相关的分析。要用Redis实现分布式锁,有很多细节需要思考,大家可以根据自己的业务形态设计符合自己要求的锁,在复杂度和安全性上做好折中。
资料
- https://www.jianshu.com/p/bb8c6c3113dd
- http://redis.cn/topics/distlock.html
- http://redis.cn/commands/set.html
后
大家如果喜欢我的文章,可以关注我的公众号(程序员麻辣烫)
http://weixin.qq.com/r/jCkOFgjEHFVjrUx193ye (二维码自动识别)
往期文章回顾: