在高并发场景的架构里,幂等性是必须得保证的。比如说支付功能,用户发起支付,如果后台没有做幂等校验,刚好用户手抖多点了几下,于是后台就可能多次收到同一个订单请求,不做幂等很容易就让用户重复支付了,这样用户是肯定不能忍的。
解决方案
1,查询和删除不在幂等讨论范围,查询肯定没有幂等的说,删除:第一次删除成功后,后面来删除直接返回0,也是返回成功。
2,建唯一索引:唯一索引或唯一组合索引来防止新增数据存在脏数据(当表存在唯一索引,并发现新增异常时,再查询一次就可以了,数据应该已经存在了,返回结果即可)。
3,token机制:由于重复点击或者网络重发,或者nginx重发等情况会导致数据被重复提交。前端在数据提交前要向后端服务的申请token,token放到 Redis 或 JVM 内存,token有效时间。提交后后台校验token,同时删除token,生成新的token返回。redis要用删除操作来判断token,删除成功代表token校验通过,如果用select+delete来校验token,存在并发问题,不建议使用。
4,悲观锁
悲观锁使用时一般伴随事务一起使用,数据锁定时间可能会很长,根据实际情况选用(另外还要考虑id是否为主键,如果id不是主键或者不是 InnoDB 存储引擎,那么就会出现锁全表)。
select id ,name from table_# where id='##' for update;
5,乐观锁,给数据库表增加一个version字段,可以通过这个字段来判断是否已经被修改了
update table_xxx set name=#name#,version=version+1 where version=#version#
6,分布式锁,比如 Redis、Zookeeper 的分布式锁。单号为key,然后给Key设置有效期(防止支付失败后,锁一直不释放),来一个请求使用订单号生成一把锁,业务代码执行完成后再释放锁。
7,保底方案,先查询是否存在此单,不存在进行支付,存在就直接返回支付结果。
最优方案:分布式锁
分布式锁方案对比。 基于Zookeeper 当使用Zookeeper实现分布式锁时,在加锁的共同父节点下创建一个新的临时有需节点。创建完成后会获取加锁父节点下所有子节点,判断自己是否为最小的一个。如果是则获得锁,进行执行加锁代码,执行完毕后删除当前临时节点。
如果判断自己不是最小的一个节点时,则获取比自己小的最近的那个节点,并对其设置被删除监听。当之前节点不存在时当前节点获得锁,执行加锁逻辑代码,当执行完毕后,当前节点自行删除。
实现核心代码:
package com.xiaohui.bean;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class ZkLock implements Lock {
//zk 客户端对象
private ZooKeeper zk;
//zk的目录结构 根节点
private String root ="/locks";
//锁名称
private String lockName;
//当前线程创建的序列node
private ThreadLocal<String> nodeId = new ThreadLocal<String>();
//用来同步等待zkClient连接到了服务端
private CountDownLatch connectedSignal = new CountDownLatch(1);
//超时时间
private final static int sessionTimeout = 3000;
private final static byte[] data = new byte[0];
public ZkLock(String config, String lockName) {
this.lockName = lockName;
try{
//创建连接
zk = new ZooKeeper(config, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
if(event.getState() == Event.KeeperState.SyncConnected){
//将count值减1
connectedSignal.countDown();
}
}
});
//[等待zk客户端链接上服务器后再继续执行]调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
connectedSignal.await();
Stat stat = zk.exists(root,false);
if(null == stat){
//创建根节点
zk.create(root,data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);//持久的节点
}
}catch (Exception e){
e.printStackTrace();
throw new RuntimeException();
}
}
@Override
public void lock() {
try{
//创建临时子节点[当前节点路径]
String curNodePath = zk.create(root+"/"+lockName,
data,
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL);
System.out.println(Thread.currentThread().getName()+" "+curNodePath+ "--- created.");
//取出所有子节点
List<String> children = zk.getChildren(root, false);
TreeSet<String> set = new TreeSet<String>();
for (String child : children) {
System.out.println(curNodePath+"----------------------------child:"+child);
set.add(root+"/"+child);
}
String smallNode = set.first();
if(curNodePath.equals(smallNode)){
//如果是最小的节点,则表示获取到锁
System.out.println(Thread.currentThread().getName()+" "+root+"/"+lockName+ "--- 获得锁.");
this.nodeId.set(curNodePath);
return;
}
//============此处如果有延时,上一个节点 在此刻被删除,自己最小缺无法实现监听==============
String preNode = set.lower(curNodePath);
CountDownLatch latch = new CountDownLatch(1);
Stat stat = zk.exists(preNode,new LockWatcher(latch));//注册监听
// 判断比自己小一个数的节点是否存在,如果不存在则无需等待解锁,同时注册监听
if(stat != null){
System.out.println(Thread.currentThread().getName()+" "+curNodePath+ "等待"+preNode +" 解锁");
latch.await();//此处等待。。。
nodeId.set(curNodePath);
latch = null;
}else{
System.out.println(Thread.currentThread().getName()+" "+curNodePath+"上一个节点不存在?我直接获取的锁");
nodeId.set(curNodePath);
latch = null;
}
}catch (Exception e){
e.printStackTrace();
throw new RuntimeException(e);
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock() {
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public void unlock() {
try{
System.out.println(Thread.currentThread().getName()+" 解锁:"+nodeId.get());
if(null != nodeId){
zk.delete(nodeId.get(),-1);
}
nodeId.remove();
}catch (Exception e){
e.printStackTrace();
}
}
@Override
public Condition newCondition() {
return null;
}
//添加wacther 监听临时顺序节点的删除
class LockWatcher implements Watcher{
private CountDownLatch latch = null;
public LockWatcher(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void process(WatchedEvent event) {
if(event.getType() == Event.EventType.NodeDeleted){
latch.countDown();
}
}
}
}
同时下发10个请求,测试代码:
package com.xiaohui.web;
import com.xiaohui.bean.Stock;
import com.xiaohui.bean.ZkLock;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class WebController {
ZkLock zkLock = new ZkLock("172.18.230.184:2181","stock_zk1");
@GetMapping("/startReduce")
public String startReduce(){
new Thread(new Runnable() {
@Override
public void run() {
zkLock.lock();
boolean b = Stock.reduceStock();
System.out.println(Thread.currentThread().getName()+"下单:"+( b ? "成功":"失败"));
zkLock.unlock();
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
zkLock.lock();
boolean b = Stock.reduceStock();
System.out.println(Thread.currentThread().getName()+"下单:"+( b ? "成功":"失败"));
zkLock.unlock();
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
zkLock.lock();
boolean b = Stock.reduceStock();
System.out.println(Thread.currentThread().getName()+"下单:"+( b ? "成功":"失败"));
zkLock.unlock();
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
zkLock.lock();
boolean b = Stock.reduceStock();
System.out.println(Thread.currentThread().getName()+"下单:"+( b ? "成功":"失败"));
zkLock.unlock();
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
zkLock.lock();
boolean b = Stock.reduceStock();
System.out.println(Thread.currentThread().getName()+"下单:"+( b ? "成功":"失败"));
zkLock.unlock();
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
zkLock.lock();
boolean b = Stock.reduceStock();
System.out.println(Thread.currentThread().getName()+"下单:"+( b ? "成功":"失败"));
zkLock.unlock();
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
zkLock.lock();
boolean b = Stock.reduceStock();
System.out.println(Thread.currentThread().getName()+"下单:"+( b ? "成功":"失败"));
zkLock.unlock();
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
zkLock.lock();
boolean b = Stock.reduceStock();
System.out.println(Thread.currentThread().getName()+"下单:"+( b ? "成功":"失败"));
zkLock.unlock();
}
}).start();
try {
Thread.sleep(1000);
}catch (Exception e){
e.printStackTrace();
}
System.out.println("Stock.count = " +Stock.count);
return "set ok!";
}
}
运行结果:
Thread-15 /locks/stock_zk10000000032--- created.
Thread-16 /locks/stock_zk10000000033--- created.
Thread-11 /locks/stock_zk10000000034--- created.
Thread-17 /locks/stock_zk10000000035--- created.
Thread-13 /locks/stock_zk10000000037--- created.
Thread-10 /locks/stock_zk10000000038--- created.
Thread-12 /locks/stock_zk10000000036--- created.
Thread-14 /locks/stock_zk10000000039--- created.
Thread-15 /locks/stock_zk1--- 获得锁.
Thread-10 /locks/stock_zk10000000038等待/locks/stock_zk10000000037 解锁
Thread-17 /locks/stock_zk10000000035等待/locks/stock_zk10000000034 解锁
Thread-11 /locks/stock_zk10000000034等待/locks/stock_zk10000000033 解锁
Thread-16 /locks/stock_zk10000000033等待/locks/stock_zk10000000032 解锁
Thread-12 /locks/stock_zk10000000036等待/locks/stock_zk10000000035 解锁
Thread-13 /locks/stock_zk10000000037等待/locks/stock_zk10000000036 解锁
Thread-14 /locks/stock_zk10000000039等待/locks/stock_zk10000000038 解锁
Thread-15下单:成功
Thread-15 解锁:/locks/stock_zk10000000032
Thread-16下单:失败
Thread-16 解锁:/locks/stock_zk10000000033
Thread-11下单:失败
Thread-11 解锁:/locks/stock_zk10000000034
Thread-17下单:失败
Thread-17 解锁:/locks/stock_zk10000000035
Thread-12下单:失败
Thread-12 解锁:/locks/stock_zk10000000036
Thread-13下单:失败
Thread-13 解锁:/locks/stock_zk10000000037
Thread-10下单:失败
Thread-10 解锁:/locks/stock_zk10000000038
Thread-14下单:失败
Thread-14 解锁:/locks/stock_zk10000000039
Stock.count = 0
实现细节可能会因应用场景和需求而有所不同。另外,Zookeeper本身具有以下优缺点:
优点:1. Zookeeper的机制可以保证分布式锁的实现。 2. Zookeeper可以利用其自身的机制保证原子性。 3. Zookeeper可以用于实现消息的一致性。 缺点:1. Zookeeper的机制可以导致系统的性能下降。 2. Zookeeper可能会因为机制的问题出现脑裂的问题。 3. Zookeeper需要自己实现网络I/O,并且实现比较复杂。
同时,使用此方案还需要单独维护一套Zookepper集群,维护成本高
2.基于Redis的分布式锁
说到Redis分布式锁大部分人都会想到:
2.1 setnx+expire
【setnx和expire分2步可能出现死锁,误删除持有锁,不支持阻塞等待、重入】
2.2或者SET lock_name value EX seconds NX
【在分布式系统中,为了避免单点故障,提高可靠性,redis都会采用主从架构,当主节点挂了后,从节点会作为主继续提供服务。该种方案能够满足大多数的业务场景,但是对于要求强一致性的场景如交易,该种方案还是有漏洞的,原因如下:
redis主从架构采用的是异步复制,当master节点拿到了锁,但是锁还未同步到slave节点,此时master节点挂了,发生故障转移,slave节点被选举为master节点,丢失了锁。这样其他线程就能够获取到该锁,显然是有问题的。基于redis实现的分布式锁只是满足了AP《A(Availability):可用性,应该能够在正常时间内对请求进行响应;P(Partition-tolerance):分区容忍性,在分布式环境中,多个节点组成的网络应该是互相连通的,当由于网络故障等原因造成网络分区,要求仍然能够对外提供服务》,并没有满足C《(Consistency):一致性,在同一时间点,所有节点的数据都是完全一致的)》】
2.3这里着重讲基于RedLock的redission【传送门】
RedLock是什么?
RedLock是基于redis实现的分布式锁,它能够保证以下特性:
互斥性:在任何时候,只能有一个客户端能够持有锁;
避免死锁:当客户端拿到锁后,即使发生了网络分区或者客户端宕机,也不会发生死锁;(利用key的存活时间)
容错性:只要多数节点的redis实例正常运行,就能够对外提供服务,加锁或者释放锁;
而非redLock是无法满足互斥性的,上面已经阐述过了原因。
RedLock算法
假设有N个redis的master节点,这些节点是相互独立的(不需要主从或者其他协调的系统)。N推荐为奇数~
客户端在获取锁时,需要做以下操作:
获取当前时间戳,以微妙为单为。
使用相同的lockName和lockValue,尝试从N个节点获取锁。(在获取锁时,要求等待获取锁的时间远小于锁的释放时间,如锁的lease_time为10s,那么wait_time应该为5-50毫秒;避免因为redis实例挂掉,客户端需要等待更长的时间才能返回,即需要让客户端能够fast_fail;如果一个redis实例不可用,那么需要继续从下个redis实例获取锁)
当从N个节点获取锁结束后,如果客户端能够从多数节点(N/2 + 1)中成功获取锁,且获取锁的时间小于失效时间,那么可认为,客户端成功获得了锁。(获取锁的时间=当前时间戳 - 步骤1的时间戳)
客户端成功获得锁后,那么锁的实际有效时间 = 设置锁的有效时间 - 获取锁的时间。
客户端获取锁失败后,N个节点的redis实例都会释放锁,即使未能加锁成功。
为什么N推荐为奇数呢?
原因1:本着最大容错的情况下,占用服务资源最少的原则,2N+1和2N+2的容灾能力是一样的,所以采用2N+1;比如,5台服务器允许2台宕机,容错性为2,6台服务器也只能允许2台宕机,容错性也是2,因为要求超过半数节点存活才OK。
原因2:假设有6个redis节点,client1和client2同时向redis实例获取同一个锁资源,那么可能发生的结果是——client1获得了3把锁,client2获得了3把锁,由于都没有超过半数,那么client1和client2获取锁都失败,对于奇数节点是不会存在这个问题。
失败时重试
当客户端无法获取到锁时,应该随机延时后进行重试,防止多个客户端在同一时间抢夺同一资源的锁(会导致脑裂,最终都不能获取到锁)。客户端获得超过半数节点的锁花费的时间越短,那么脑裂的概率就越低。所以,理想的情况下,客户端最好能够同时(并发)向所有redis发出set命令。
当客户端从多数节点获取锁失败时,应该尽快释放已经成功获取的锁,这样其他客户端不需要等待锁过期后再获取。(如果存在网络分区,客户端已经无法和redis进行通信,那么此时只能等待锁过期后自动释放)
释放锁
向所有redis实例发送释放锁命令即可,不需要关心redis实例有没有成功上锁。
redisson在加锁的时候,key=lockName, value=uuid + threadID,采用set结构存储,并包含了上锁的次数(支持可重入);解锁的时候通过hexists判断key和value是否存在,存在则解锁;这里不会出现误解锁
性能、崩溃恢复和redis同步
如何提升分布式锁的性能?以每分钟执行多少次acquire/release操作作为性能指标,一方面通过增加redis实例可用降低响应延迟,另一方面,使用非阻塞模型,一次发送所有的命令,然后异步读取响应结果,这里假设客户端和redis之间的RTT差不多。
如果redis没用使用备份,redis重启后,那么会丢失锁,导致多个客户端都能获取到锁。通过AOF持久化可以缓解这个问题。redis key过期是unix时间戳,即便是redis重启,那么时间依然是前进的。但是,如果是断电呢?redis在启动后,可能就会丢失这个key(在写入或者还未写入磁盘时断电了,取决于fsync的配置),如果采用fsync=always,那么会极大影响性能。如何解决这个问题呢?可以让redis节点重启后,在一个TTL时间段内,对客户端不可用即可。
Redisson
redisson是在redis基础上实现的一套开源解决方案,不仅提供了一系列的分布式的java常用对象,还提供了许多分布式服务,宗旨是促进使用者对redis的关注分离,更多的关注业务逻辑的处理上。
redisson也对redlock做了一套实现,详细如下:
public static void main() {
Config config1 = new Config();
config1.useSingleServer().setAddress("redis://xxxx1:xxx1")
.setPassword("xxxx1")
.setDatabase(0);
RedissonClient redissonClient1 = Redisson.create(config1);
Config config2 = new Config();
config2.useSingleServer()
.setAddress("redis://xxxx2:xxx2")
.setPassword("xxxx2")
.setDatabase(0);
RedissonClient redissonClient2 = Redisson.create(config2);
Config config3 = new Config();
config3.useSingleServer().
setAddress("redis://xxxx3:xxx3")
.setPassword("xxxx3")
.setDatabase(0);
RedissonClient redissonClient3 = Redisson.create(config3);
String lockName = "redlock-test";
RLock lock1 = redissonClient1.getLock(lockName);
RLock lock2 = redissonClient2.getLock(lockName);
RLock lock3 = redissonClient3.getLock(lockName);
RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
boolean isLock;
try {
/**
* 尝试获取锁
* waitTimeout 尝试获取锁的最大等待时间,超过这个值,则认为获取锁失败
* leaseTime 锁的持有时间,超过这个时间锁会自动失效(值应设置为大于业务处理的时间,确保在锁有效期内业务能处理完)
*/
isLock = redLock.tryLock(500, 30000, TimeUnit.MILLISECONDS);
if (isLock) {
// 获取锁成功,处理业务
Thread.sleep(30000);
}
} catch (Exception e) {
} finally {
// 无论如何, 最后都要解锁
redLock.unlock();
}
}
tryLock()源码:redisson对redlock的实现方式基本和上面文字描述的类似,有一点区别在于,redisson在获取锁成功后,会对key的失效时间重新赋值。
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long newLeaseTime = -1;
if (leaseTime != -1) {
newLeaseTime = unit.toMillis(waitTime)*2;
}
long time = System.currentTimeMillis();
long remainTime = -1;
if (waitTime != -1) {
remainTime = unit.toMillis(waitTime);
}
long lockWaitTime = calcLockWaitTime(remainTime);
int failedLocksLimit = failedLocksLimit();
List<RLock> acquiredLocks = new ArrayList<RLock>(locks.size());
for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
RLock lock = iterator.next();
boolean lockAcquired;
try {
if (waitTime == -1 && leaseTime == -1) {
lockAcquired = lock.tryLock();
} else {
long awaitTime = Math.min(lockWaitTime, remainTime);
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
}
} catch (RedisResponseTimeoutException e) {
unlockInner(Arrays.asList(lock));
lockAcquired = false;
} catch (Exception e) {
lockAcquired = false;
}
if (lockAcquired) {
acquiredLocks.add(lock);
} else {
if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
break;
}
if (failedLocksLimit == 0) {
unlockInner(acquiredLocks);
if (waitTime == -1 && leaseTime == -1) {
return false;
}
failedLocksLimit = failedLocksLimit();
acquiredLocks.clear();
// reset iterator
while (iterator.hasPrevious()) {
iterator.previous();
}
} else {
failedLocksLimit--;
}
}
if (remainTime != -1) {
remainTime -= (System.currentTimeMillis() - time);
time = System.currentTimeMillis();
if (remainTime <= 0) {
unlockInner(acquiredLocks);
return false;
}
}
}
if (leaseTime != -1) {
List<RFuture<Boolean>> futures = new ArrayList<RFuture<Boolean>>(acquiredLocks.size());
for (RLock rLock : acquiredLocks) {
RFuture<Boolean> future = rLock.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
futures.add(future);
}
for (RFuture<Boolean> rFuture : futures) {
rFuture.syncUninterruptibly();
}
}
return true;
}
Reddsion还有基于lua脚本的实现这里也简单讲下:
总体流程简单来说:
1、线程A和线程B两个线程同时争抢锁。线程A很幸运,最先抢到了锁。线程B在获取锁失败后,并未放弃希望,而是主动订阅了解锁消息,然后再尝试获取锁,顺便看看没有抢到的这把锁还有多久就过期,线程B就按需阻塞等锁释放。
2、线程A拿着锁干完了活,自觉释放了持有的锁,于此同时广播了解锁消息,通知其他抢锁的线程再来枪;
3、解锁消息的监听者LockPubSub收到消息后,释放自己持有的信号量;线程B就瞬间从阻塞中被唤醒了,接着再抢锁,这次终于抢到锁了!后面再按部就班,干完活,解锁
网上有一张总体过程时序图不错【传送门】:
加锁与解锁都是通过lua脚本实现的:
-- 若锁不存在:则直接广播解锁消息,并返回1
if (redis.call('exists', KEYS[1]) == 0) then
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
-- 若锁存在,但唯一标识不匹配:则表明锁被其他线程占用,当前线程不允许解锁其他线程持有的锁
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
return nil;
end;
-- 若锁存在,且唯一标识匹配:则先将锁重入计数减1
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then
-- 锁重入计数减1后还大于0:表明当前线程持有的锁还有重入,不能进行锁删除操作,但可以友好地帮忙设置下过期时期
redis.call('pexpire', KEYS[1], ARGV[2]);
return 0;
else
-- 锁重入计数已为0:间接表明锁已释放了。直接删除掉锁,并广播解锁消息,去唤醒那些争抢过锁但还处于阻塞中的线程
redis.call('del', KEYS[1]);
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
return nil;
-- 若锁不存在:则新增锁,并设置锁重入计数为1、设置锁过期时间
if (redis.call('exists', KEYS[1]) == 0) then
redis.call('hset', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
-- 若锁存在,且唯一标识也匹配:则表明当前加锁请求为锁重入请求,故锁重入计数+1,并再次设置锁过期时间
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
-- 若锁存在,但唯一标识不匹配:表明锁是被其他线程占用,当前线程无权解他人的锁,直接返回锁剩余过期时间
return redis.call('pttl', KEYS[1]);
加锁源码:
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
// 获取锁能容忍的最大等待时长
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
final long threadId = Thread.currentThread().getId();
// 【核心点1】尝试获取锁,若返回值为null,则表示已获取到锁
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
// 还可以容忍的等待时长=获取锁能容忍的最大等待时长 - 执行完上述操作流逝的时间
time -= (System.currentTimeMillis() - current);
if (time <= 0) {
acquireFailed(threadId);
return false;
}
current = System.currentTimeMillis();
// 【核心点2】订阅解锁消息,见org.redisson.pubsub.LockPubSub#onMessage
/**
* 4.订阅锁释放事件,并通过await方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题:
* 基于信息量,当锁被其它资源占用时,当前线程通过 Redis 的 channel 订阅锁的释放事件,一旦锁释放会发消息通知待等待的线程进行竞争
* 当 this.await返回false,说明等待时间已经超出获取锁最大等待时间,取消订阅并返回获取锁失败
* 当 this.await返回true,进入循环尝试获取锁
*/
final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
//await 方法内部是用CountDownLatch来实现阻塞,获取subscribe异步执行的结果(应用了Netty 的 Future)
if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
@Override
public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
if (subscribeFuture.isSuccess()) {
unsubscribe(subscribeFuture, threadId);
}
}
});
}
acquireFailed(threadId);
return false;
}
// 订阅成功
try {
// 还可以容忍的等待时长=获取锁能容忍的最大等待时长 - 执行完上述操作流逝的时间
time -= (System.currentTimeMillis() - current);
if (time <= 0) {
// 超出可容忍的等待时长,直接返回获取锁失败
acquireFailed(threadId);
return false;
}
while (true) {
long currentTime = System.currentTimeMillis();
// 尝试获取锁;如果锁被其他线程占用,就返回锁剩余过期时间【同上】
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= (System.currentTimeMillis() - currentTime);
if (time <= 0) {
acquireFailed(threadId);
return false;
}
// waiting for message
currentTime = System.currentTimeMillis();
// 【核心点3】根据锁TTL,调整阻塞等待时长;
// 注意:这里实现非常巧妙,1、latch其实是个信号量Semaphore,调用其tryAcquire方法会让当前线程阻塞一段时间,避免了在while循环中频繁请求获取锁;
//2、该Semaphore的release方法,会在订阅解锁消息的监听器消息处理方法org.redisson.pubsub.LockPubSub#onMessage调用;当其他线程释放了占用的锁,会广播解锁消息,监听器接收解锁消息,并释放信号量,最终会唤醒阻塞在这里的线程。
if (ttl >= 0 && ttl < time) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= (System.currentTimeMillis() - currentTime);
if (time <= 0) {
acquireFailed(threadId);
return false;
}
}
} finally {
// 取消解锁消息的订阅
unsubscribe(subscribeFuture, threadId);
}
}
获取锁tryAcquire的实现,就是执行Lua脚本:
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
// tryAcquireAsync异步执行Lua脚本,get方法同步获取返回结果
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
// 见org.redisson.RedissonLock#tryAcquireAsync
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1) {
// 实质是异步执行加锁Lua脚本
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
//先判断这个异步操作有没有执行成功,如果没有成功,直接返回,如果执行成功了,就会同步获取结果
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
// lock acquired
//如果ttlRemaining为null,则会执行一个定时调度的方法scheduleExpirationRenewal
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
// 见org.redisson.RedissonLock#tryLockInnerAsync
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
解锁:
@Override
public void unlock() {
// 执行解锁Lua脚本,这里传入线程id,是为了保证加锁和解锁是同一个线程,避免误解锁其他线程占有的锁
Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));
if (opStatus == null) {
throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + Thread.currentThread().getId());
}
if (opStatus) {
cancelExpirationRenewal();
}
}
// 见org.redisson.RedissonLock#unlockInnerAsync
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}
在解锁Lua脚本中,操作了两个key:一个是锁名my_lock_name,一个是解锁消息发布订阅频道redisson_lock__channel:{my_first_lock_name},按照上面slot计算方式,两个key都会按照内容my_first_lock_name来计算,故能保证落到同一个slot.