来自:juejin.im/post/6891571079702118407
Redis 实现
SETNX key value
命令,意为 set if not exists
(如果不存在该 key,才去 set 值),就比如说是张三去上厕所,看厕所门锁着,他就不进去了,厕所门开着他才去。SETEX key seconds value
命令,为指定 key 设置过期时间,单位为秒。SET key value ex seconds nx
,加锁的同时设置过期时间。//基于jedis和lua脚本来实现
privatestaticfinal String LOCK_SUCCESS = "OK";
privatestaticfinal Long RELEASE_SUCCESS = 1L;
privatestaticfinal String SET_IF_NOT_EXIST = "NX";
privatestaticfinal String SET_WITH_EXPIRE_TIME = "PX";
@Override
public String acquire() {
try {
// 获取锁的超时时间,超过这个时间则放弃获取锁
long end = System.currentTimeMillis() + acquireTimeout;
// 随机生成一个 value
String requireToken = UUID.randomUUID().toString();
while (System.currentTimeMillis() < end) {
String result = jedis
.set(lockKey, requireToken, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
if (LOCK_SUCCESS.equals(result)) {
return requireToken;
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
} catch (Exception e) {
log.error("acquire lock due to error", e);
}
returnnull;
}
@Override
public boolean release(String identify) {
if (identify == null) {
returnfalse;
}
//通过lua脚本进行比对删除操作,保证原子性
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Object result = new Object();
try {
result = jedis.eval(script, Collections.singletonList(lockKey),
Collections.singletonList(identify));
if (RELEASE_SUCCESS.equals(result)) {
log.info("release lock success, requestToken:{}", identify);
returntrue;
}
} catch (Exception e) {
log.error("release lock due to error", e);
} finally {
if (jedis != null) {
jedis.close();
}
}
log.info("release lock failed, requestToken:{}, result:{}", identify, result);
returnfalse;
}
思考:加锁和释放锁的原子性可以用 lua 脚本来保证,那锁的自动续期改如何实现呢?
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.4</version>
</dependency>
private void test() {
//分布式锁名 锁的粒度越细,性能越好
RLock lock = redissonClient.getLock("test_lock");
lock.lock();
try {
//具体业务......
} finally {
lock.unlock();
}
}
// 常见的使用方法
lock.lock();
// 加锁以后10秒钟自动解锁
// 无需调用unlock方法手动解锁
lock.lock(10, TimeUnit.SECONDS);
小结:虽然 lock() 有自动续锁机制,但是开发中还是推荐使用 lock(time,timeUnit)
,因为它省掉了整个续期带来的性能损,可以设置过期时间长一点,搭配unlock()
。
public void test() {
RLock lock = redissonClient.getLock("test_lock");
lock.lock(30, TimeUnit.SECONDS);
try {
//.......具体业务
} finally {
//手动释放锁
lock.unlock();
}
}
基于 Zookeeper 来实现分布式锁
create [-s] [-e] path [data]
命令,-s
为创建有序节点,-e
创建临时节点。ls [-w] path
为查看节点命令,-w
为添加一个 watch(监视器),/
为查看根节点所有节点,可以看到我们刚才所创建的节点,同时如果是跟着指定节点名字的话为查看指定节点下的子节点。当个线程进来时会去父节点上创建一个临时的顺序节点。 第二个线程进来发现锁已经被持有了,就会为当前持有锁的节点注册一个 watcher 监听器。 第三个线程进来发现锁已经被持有了,因为是顺序节点的缘故,就会为上一个节点去创建一个 watcher 监听器。 当个线程释放锁后,删除节点,由它的下一个节点去占有锁。
public class ZooKeeperDistributedLock implements Watcher {
private ZooKeeper zk;
private String locksRoot = "/locks";
private String productId;
private String waitNode;
private String lockNode;
private CountDownLatch latch;
private CountDownLatch connectedLatch = new CountDownLatch(1);
private int sessionTimeout = 30000;
public ZooKeeperDistributedLock(String productId) {
this.productId = productId;
try {
String address = "192.168.189.131:2181,192.168.189.132:2181";
zk = new ZooKeeper(address, sessionTimeout, this);
connectedLatch.await();
} catch (IOException e) {
throw new LockException(e);
} catch (KeeperException e) {
throw new LockException(e);
} catch (InterruptedException e) {
throw new LockException(e);
}
}
public void process(WatchedEvent event) {
if (event.getState() == KeeperState.SyncConnected) {
connectedLatch.countDown();
return;
}
if (this.latch != null) {
this.latch.countDown();
}
}
public void acquireDistributedLock() {
try {
if (this.tryLock()) {
return;
} else {
waitForLock(waitNode, sessionTimeout);
}
} catch (KeeperException e) {
throw new LockException(e);
} catch (InterruptedException e) {
throw new LockException(e);
}
}
//获取锁
public boolean tryLock() {
try {
// 传入进去的locksRoot + “/” + productId
// 假设productId代表了一个商品id,比如说1
// locksRoot = locks
// /locks/10000000000,/locks/10000000001,/locks/10000000002
lockNode = zk.create(locksRoot + "/" + productId, new byte[], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 看看刚创建的节点是不是小的节点
// locks:10000000000,10000000001,10000000002
List<String> locks = zk.getChildren(locksRoot, false);
Collections.sort(locks);
if(lockNode.equals(locksRoot+"/"+ locks.get())){
//如果是小的节点,则表示取得锁
return true;
}
//如果不是小的节点,找到比自己小1的节点
int previousLockIndex = -1;
for(int i = ; i < locks.size(); i++) {
if(lockNode.equals(locksRoot + “/” + locks.get(i))) {
previousLockIndex = i - 1;
break;
}
}
this.waitNode = locks.get(previousLockIndex);
} catch (KeeperException e) {
throw new LockException(e);
} catch (InterruptedException e) {
throw new LockException(e);
}
return false;
}
private boolean waitForLock(String waitNode, long waitTime) throws InterruptedException, KeeperException {
Stat stat = zk.exists(locksRoot + "/" + waitNode, true);
if (stat != null) {
this.latch = new CountDownLatch(1);
this.latch.await(waitTime, TimeUnit.MILLISECONDS);
this.latch = null;
}
return true;
}
//释放锁
public void unlock() {
try {
System.out.println("unlock " + lockNode);
zk.delete(lockNode, -1);
lockNode = null;
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
//异常
public class LockException extends RuntimeException {
private static final long serialVersionUID = 1L;
public LockException(String e) {
super(e);
}
public LockException(Exception e) {
super(e);
}
}
}
总结
实现方式的不同,Redis 实现为去插入一条占位数据,而 ZK 实现为去注册一个临时节点。
遇到宕机情况时,Redis 需要等到过期时间到了后自动释放锁,而 ZK 因为是临时节点,在宕机时候已经是删除了节点去释放锁。
Redis 在没抢占到锁的情况下一般会去自旋获取锁,比较浪费性能,而 ZK 是通过注册监听器的方式获取锁,性能而言优于 Redis。