基于 Redis 的分布式锁对大家来说并不陌生,可是你的分布式锁有失败的时候吗?在失败的时候可曾怀疑过你在用的分布式锁真的靠谱吗?以下是结合自己的踩坑经验总结的一些经验之谈。
你真的需要分布式锁吗?
提高效率。比如多个节点计算同一批任务,如果某个任务已经有节点在计算了,那其他节点就不用重复计算了,以免浪费计算资源。不过重复计算也没事,不会造成其他更大的损失。也就是允许偶尔的失败。
保证正确性。这种情况对锁的要求就很高了,如果重复计算,会对正确性造成影响。这种不允许失败。
从一个简单的分布式锁实现说起
简单的实现
加锁和解锁的锁必须是同一个,常见的解决方案是给每个锁一个钥匙( ID),加锁时生成,解锁时判断。
不能让一个资源加锁。常见的解决方案是给一个锁的过期时间。当然了还有其他方案,后面再说。
public static boolean tryLock(String key, String uniqueId, int seconds) {
return "OK".equals(jedis.set(key, uniqueId, "NX", "EX", seconds));
}
https://redis.io/commands/set
public static boolean releaseLock(String key, String uniqueId) {
String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then " +
"return redis.call('del', KEYS[1]) else return 0 end";
return jedis.eval(
luaScript,
Collections.singletonList(key),
Collections.singletonList(uniqueId)
).equals(1L);
}
靠谱吗?
单点问题。上面的实现只要一个 Master 节点就能搞定,这里的单点指的是单 Master,就算是个集群,如果加锁成功后,锁从 Master 复制到 Slave 的时候挂了,也是会出现同一资源被多个 Client 加锁的。
执行时间超过了锁的过期时间。上面写到为了不出现一直上锁的情况,加了一个兜底的过期时间,时间到了锁自动释放,但是,如果在这期间任务并没有做完怎么办?由于 GC 或者网络延迟导致的任务时间变长,很难保证任务一定能在锁的过期时间内完成。
Redlock 算法
分布式锁的坑
高并发场景下的问题
节点宕机
假设有 5 个 Redis 的节点:A、B、C、D、E,没有做持久化。
Client1 从 A、B、C 这3 个节点获取锁成功,那么 client1 获取锁成功。
节点 C 挂了。
Client2 从 C、D、E 获取锁成功,client2 也获取锁成功,那么在同一时刻 Client1 和 Client2 同时获取锁,Redlock 被玩坏了。
任务执行时间超过锁的 TTL
Client1 获取到锁。
Client1 开始任务,然后发生了 STW 的 GC,时间超过了锁的过期时间。
Client2 获取到锁,开始了任务。
Client1 的 GC 结束,继续任务,这个时候 Client1 和 Client2 都认为自己获取了锁,都会处理任务,从而发生错误。
public class RedissonLock extends RedissonExpirable implements RLock {
...
@Override
public void lock() {
try {
lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Override
public void lock(long leaseTime, TimeUnit unit) {
try {
lockInterruptibly(leaseTime, unit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
...
}
public class RedissonLock extends RedissonExpirable implements RLock {
...
public static final long LOCK_EXPIRATION_INTERVAL_SECONDS = 30;
protected long internalLockLeaseTime = TimeUnit.SECONDS.toMillis(LOCK_EXPIRATION_INTERVAL_SECONDS);
@Override
public void lock() {
try {
lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
lockInterruptibly(-1, null);
}
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= ) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS, TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
private void scheduleExpirationRenewal(final long threadId) {
if (expirationRenewalMap.containsKey(getEntryName())) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
expirationRenewalMap.remove(getEntryName());
if (!future.isSuccess()) {
log.error("Can't update lock " + getName() + " expiration", future.cause());
return;
}
if (future.getNow()) {
// reschedule itself
scheduleExpirationRenewal(threadId);
}
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
task.cancel();
}
}
...
}
系统时钟漂移
系统的时钟和 NTP 服务器不同步。这个目前没有特别好的解决方案,只能相信运维同学了。
-
clock realtime 被人为修改。在实现分布式锁时,不要使用 clock realtime。
不过很可惜,Redis 使用的就是这个时间,我看了下 Redis 5.0 源码,使用的还是 clock realtime。
Antirez 说过改成 clock monotonic 的,不过大佬还没有改。也就是说,人为修改 Redis 服务器的时间,就能让 Redis 出问题了。
总结
END