顾乔芝士网

持续更新的前后端开发技术栈

JAVA面试宝典-分布式篇(二十二)分布式幂等性如何设计?

#头条创作挑战赛#

在高并发场景的架构里,幂等性是必须得保证的。比如说支付功能,用户发起支付,如果后台没有做幂等校验,刚好用户手抖多点了几下,于是后台就可能多次收到同一个订单请求,不做幂等很容易就让用户重复支付了,这样用户是肯定不能忍的。

解决方案

1,查询和删除不在幂等讨论范围,查询肯定没有幂等的说,删除:第一次删除成功后,后面来删除直接返回0,也是返回成功。

2,建唯一索引:唯一索引或唯一组合索引来防止新增数据存在脏数据(当表存在唯一索引,并发现新增异常时,再查询一次就可以了,数据应该已经存在了,返回结果即可)。

3,token机制:由于重复点击或者网络重发,或者nginx重发等情况会导致数据被重复提交。前端在数据提交前要向后端服务的申请token,token放到 Redis 或 JVM 内存,token有效时间。提交后后台校验token,同时删除token,生成新的token返回。redis要用删除操作来判断token,删除成功代表token校验通过,如果用select+delete来校验token,存在并发问题,不建议使用。

4,悲观锁

悲观锁使用时一般伴随事务一起使用,数据锁定时间可能会很长,根据实际情况选用(另外还要考虑id是否为主键,如果id不是主键或者不是 InnoDB 存储引擎,那么就会出现锁全表)。

select id ,name from table_# where id='##' for update;

5,乐观锁,给数据库表增加一个version字段,可以通过这个字段来判断是否已经被修改了

update table_xxx set name=#name#,version=version+1 where version=#version#

6,分布式锁,比如 Redis、Zookeeper 的分布式锁。单号为key,然后给Key设置有效期(防止支付失败后,锁一直不释放),来一个请求使用订单号生成一把锁,业务代码执行完成后再释放锁。

7,保底方案,先查询是否存在此单,不存在进行支付,存在就直接返回支付结果。

最优方案:分布式锁


分布式锁方案对比。 基于Zookeeper 当使用Zookeeper实现分布式锁时,在加锁的共同父节点下创建一个新的临时有需节点。创建完成后会获取加锁父节点下所有子节点,判断自己是否为最小的一个。如果是则获得锁,进行执行加锁代码,执行完毕后删除当前临时节点。

如果判断自己不是最小的一个节点时,则获取比自己小的最近的那个节点,并对其设置被删除监听。当之前节点不存在时当前节点获得锁,执行加锁逻辑代码,当执行完毕后,当前节点自行删除。

实现核心代码:

package com.xiaohui.bean;
 
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.springframework.stereotype.Component;
 
import java.util.List;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
 
public class ZkLock implements Lock {
 
    //zk 客户端对象
    private ZooKeeper zk;
    //zk的目录结构 根节点
    private String root ="/locks";
    //锁名称
    private String lockName;
    //当前线程创建的序列node
    private ThreadLocal<String> nodeId = new ThreadLocal<String>();
    //用来同步等待zkClient连接到了服务端
    private CountDownLatch connectedSignal = new CountDownLatch(1);
    //超时时间
    private final static int sessionTimeout = 3000;
 
    private final static byte[] data = new byte[0];
     
 
    public ZkLock(String config, String lockName) {
        this.lockName = lockName;
        try{
            //创建连接
            zk = new ZooKeeper(config, sessionTimeout, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if(event.getState() == Event.KeeperState.SyncConnected){
                        //将count值减1
                        connectedSignal.countDown();
                    }
                }
            });
            //[等待zk客户端链接上服务器后再继续执行]调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
            connectedSignal.await();
            Stat stat = zk.exists(root,false);
            if(null == stat){
                //创建根节点
                zk.create(root,data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);//持久的节点
            }
        }catch (Exception e){
            e.printStackTrace();
            throw new RuntimeException();
        }
    }
  
    @Override
    public void lock() {
        try{
            //创建临时子节点[当前节点路径]
            String curNodePath = zk.create(root+"/"+lockName,
                    data,
                    ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.PERSISTENT_SEQUENTIAL);
            System.out.println(Thread.currentThread().getName()+" "+curNodePath+ "--- created.");
            //取出所有子节点
            List<String> children = zk.getChildren(root, false);
            TreeSet<String> set = new TreeSet<String>();
            for (String child : children) {
                System.out.println(curNodePath+"----------------------------child:"+child);
 
                set.add(root+"/"+child);
            }
 
            String smallNode = set.first();
            if(curNodePath.equals(smallNode)){
                //如果是最小的节点,则表示获取到锁
                System.out.println(Thread.currentThread().getName()+" "+root+"/"+lockName+ "--- 获得锁.");
                this.nodeId.set(curNodePath);
                return;
            }
            //============此处如果有延时,上一个节点 在此刻被删除,自己最小缺无法实现监听==============
            String preNode = set.lower(curNodePath);
            CountDownLatch latch = new CountDownLatch(1);
            Stat stat = zk.exists(preNode,new LockWatcher(latch));//注册监听
            // 判断比自己小一个数的节点是否存在,如果不存在则无需等待解锁,同时注册监听
            if(stat != null){
                System.out.println(Thread.currentThread().getName()+" "+curNodePath+ "等待"+preNode +" 解锁");
                latch.await();//此处等待。。。
                nodeId.set(curNodePath);
                latch = null;
            }else{
                System.out.println(Thread.currentThread().getName()+" "+curNodePath+"上一个节点不存在?我直接获取的锁");
                nodeId.set(curNodePath);
                latch = null;
            }
        }catch (Exception e){
            e.printStackTrace();
            throw  new RuntimeException(e);
        }
    }
 
    @Override
    public void lockInterruptibly() throws InterruptedException { 
    }
 
    @Override
    public boolean tryLock() {
        return false;
    }
 
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }
 
    @Override
    public void unlock() {
        try{
            System.out.println(Thread.currentThread().getName()+" 解锁:"+nodeId.get());
            if(null != nodeId){
                zk.delete(nodeId.get(),-1);
            }
            nodeId.remove();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
 
    @Override
    public Condition newCondition() {
        return null;
    }
  
    //添加wacther 监听临时顺序节点的删除
    class LockWatcher implements Watcher{
        private CountDownLatch latch = null; 
        public LockWatcher(CountDownLatch latch) {
            this.latch = latch;
        }
 
        @Override
        public void process(WatchedEvent event) {
            if(event.getType() == Event.EventType.NodeDeleted){
                latch.countDown();
            }
        }
    } 
}

同时下发10个请求,测试代码:

package com.xiaohui.web;
 
import com.xiaohui.bean.Stock;
import com.xiaohui.bean.ZkLock;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
  
@RestController
public class WebController {
 
    ZkLock zkLock = new ZkLock("172.18.230.184:2181","stock_zk1");
 
    @GetMapping("/startReduce")
    public String startReduce(){
         new Thread(new Runnable() {
            @Override
            public void run() {
                zkLock.lock();
                boolean b = Stock.reduceStock();
                System.out.println(Thread.currentThread().getName()+"下单:"+( b ? "成功":"失败"));
                zkLock.unlock();
            }
        }).start();
         new Thread(new Runnable() {
            @Override
            public void run() {
                zkLock.lock();
                boolean b = Stock.reduceStock();
                System.out.println(Thread.currentThread().getName()+"下单:"+( b ? "成功":"失败"));
                zkLock.unlock();
            }
        }).start();
         new Thread(new Runnable() {
            @Override
            public void run() {
                zkLock.lock();
                boolean b = Stock.reduceStock();
                System.out.println(Thread.currentThread().getName()+"下单:"+( b ? "成功":"失败"));
                zkLock.unlock();
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                zkLock.lock();
                boolean b = Stock.reduceStock();
                System.out.println(Thread.currentThread().getName()+"下单:"+( b ? "成功":"失败"));
                zkLock.unlock();
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                zkLock.lock();
                boolean b = Stock.reduceStock();
                System.out.println(Thread.currentThread().getName()+"下单:"+( b ? "成功":"失败"));
                zkLock.unlock();
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                zkLock.lock();
                boolean b = Stock.reduceStock();
                System.out.println(Thread.currentThread().getName()+"下单:"+( b ? "成功":"失败"));
                zkLock.unlock();
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                zkLock.lock();
                boolean b = Stock.reduceStock();
                System.out.println(Thread.currentThread().getName()+"下单:"+( b ? "成功":"失败"));
                zkLock.unlock();
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                zkLock.lock();
                boolean b = Stock.reduceStock();
                System.out.println(Thread.currentThread().getName()+"下单:"+( b ? "成功":"失败"));
                zkLock.unlock();
            }
        }).start(); 
        try {
            Thread.sleep(1000);
        }catch (Exception e){
            e.printStackTrace();
        }
        System.out.println("Stock.count = " +Stock.count);
        return  "set ok!";
    } 
}

运行结果:

Thread-15 /locks/stock_zk10000000032--- created.
Thread-16 /locks/stock_zk10000000033--- created.
Thread-11 /locks/stock_zk10000000034--- created.
Thread-17 /locks/stock_zk10000000035--- created.
Thread-13 /locks/stock_zk10000000037--- created.
Thread-10 /locks/stock_zk10000000038--- created.
Thread-12 /locks/stock_zk10000000036--- created.
Thread-14 /locks/stock_zk10000000039--- created.
Thread-15 /locks/stock_zk1--- 获得锁.
Thread-10 /locks/stock_zk10000000038等待/locks/stock_zk10000000037 解锁
Thread-17 /locks/stock_zk10000000035等待/locks/stock_zk10000000034 解锁
Thread-11 /locks/stock_zk10000000034等待/locks/stock_zk10000000033 解锁
Thread-16 /locks/stock_zk10000000033等待/locks/stock_zk10000000032 解锁
Thread-12 /locks/stock_zk10000000036等待/locks/stock_zk10000000035 解锁
Thread-13 /locks/stock_zk10000000037等待/locks/stock_zk10000000036 解锁
Thread-14 /locks/stock_zk10000000039等待/locks/stock_zk10000000038 解锁
Thread-15下单:成功
Thread-15 解锁:/locks/stock_zk10000000032
Thread-16下单:失败
Thread-16 解锁:/locks/stock_zk10000000033
Thread-11下单:失败
Thread-11 解锁:/locks/stock_zk10000000034
Thread-17下单:失败
Thread-17 解锁:/locks/stock_zk10000000035
Thread-12下单:失败
Thread-12 解锁:/locks/stock_zk10000000036
Thread-13下单:失败
Thread-13 解锁:/locks/stock_zk10000000037
Thread-10下单:失败
Thread-10 解锁:/locks/stock_zk10000000038
Thread-14下单:失败
Thread-14 解锁:/locks/stock_zk10000000039
Stock.count = 0

实现细节可能会因应用场景和需求而有所不同。另外,Zookeeper本身具有以下优缺点:

优点:1. Zookeeper的机制可以保证分布式锁的实现。 2. Zookeeper可以利用其自身的机制保证原子性。 3. Zookeeper可以用于实现消息的一致性。 缺点:1. Zookeeper的机制可以导致系统的性能下降。 2. Zookeeper可能会因为机制的问题出现脑裂的问题。 3. Zookeeper需要自己实现网络I/O,并且实现比较复杂。

同时使用此方案还需要单独维护一套Zookepper集群维护成本高

2.基于Redis的分布式锁

说到Redis分布式锁大部分人都会想到:

2.1 setnx+expire

【setnx和expire分2步可能出现死锁,误删除持有锁,不支持阻塞等待、重入】

2.2或者SET lock_name value EX seconds NX

【在分布式系统中,为了避免单点故障,提高可靠性,redis都会采用主从架构,当主节点挂了后,从节点会作为主继续提供服务。该种方案能够满足大多数的业务场景,但是对于要求强一致性的场景如交易,该种方案还是有漏洞的,原因如下:

redis主从架构采用的是异步复制,当master节点拿到了锁,但是锁还未同步到slave节点,此时master节点挂了,发生故障转移,slave节点被选举为master节点,丢失了锁。这样其他线程就能够获取到该锁,显然是有问题的。基于redis实现的分布式锁只是满足了AP《A(Availability):可用性,应该能够在正常时间内对请求进行响应;P(Partition-tolerance):分区容忍性,在分布式环境中,多个节点组成的网络应该是互相连通的,当由于网络故障等原因造成网络分区,要求仍然能够对外提供服务》,并没有满足C《(Consistency):一致性,在同一时间点,所有节点的数据都是完全一致的)》】

2.3这里着重讲基于RedLock的redission【传送门

RedLock是什么?

RedLock是基于redis实现的分布式锁,它能够保证以下特性:

互斥性:在任何时候,只能有一个客户端能够持有锁;

避免死锁:当客户端拿到锁后,即使发生了网络分区或者客户端宕机,也不会发生死锁;(利用key的存活时间)

容错性:只要多数节点的redis实例正常运行,就能够对外提供服务,加锁或者释放锁;

而非redLock是无法满足互斥性的,上面已经阐述过了原因。

RedLock算法

假设有N个redis的master节点,这些节点是相互独立的(不需要主从或者其他协调的系统)。N推荐为奇数~

客户端在获取锁时,需要做以下操作:

获取当前时间戳,以微妙为单为。

使用相同的lockName和lockValue,尝试从N个节点获取锁。(在获取锁时,要求等待获取锁的时间远小于锁的释放时间,如锁的lease_time为10s,那么wait_time应该为5-50毫秒;避免因为redis实例挂掉,客户端需要等待更长的时间才能返回,即需要让客户端能够fast_fail;如果一个redis实例不可用,那么需要继续从下个redis实例获取锁)

当从N个节点获取锁结束后,如果客户端能够从多数节点(N/2 + 1)中成功获取锁,且获取锁的时间小于失效时间,那么可认为,客户端成功获得了锁。(获取锁的时间=当前时间戳 - 步骤1的时间戳)

客户端成功获得锁后,那么锁的实际有效时间 = 设置锁的有效时间 - 获取锁的时间。

客户端获取锁失败后,N个节点的redis实例都会释放锁,即使未能加锁成功。

为什么N推荐为奇数呢?

原因1:本着最大容错的情况下,占用服务资源最少的原则,2N+1和2N+2的容灾能力是一样的,所以采用2N+1;比如,5台服务器允许2台宕机,容错性为2,6台服务器也只能允许2台宕机,容错性也是2,因为要求超过半数节点存活才OK。

原因2:假设有6个redis节点,client1和client2同时向redis实例获取同一个锁资源,那么可能发生的结果是——client1获得了3把锁,client2获得了3把锁,由于都没有超过半数,那么client1和client2获取锁都失败,对于奇数节点是不会存在这个问题。

失败时重试

当客户端无法获取到锁时,应该随机延时后进行重试,防止多个客户端在同一时间抢夺同一资源的锁(会导致脑裂,最终都不能获取到锁)。客户端获得超过半数节点的锁花费的时间越短,那么脑裂的概率就越低。所以,理想的情况下,客户端最好能够同时(并发)向所有redis发出set命令。

当客户端从多数节点获取锁失败时,应该尽快释放已经成功获取的锁,这样其他客户端不需要等待锁过期后再获取。(如果存在网络分区,客户端已经无法和redis进行通信,那么此时只能等待锁过期后自动释放)

释放锁

向所有redis实例发送释放锁命令即可,不需要关心redis实例有没有成功上锁。

redisson在加锁的时候,key=lockName, value=uuid + threadID,采用set结构存储,并包含了上锁的次数(支持可重入);解锁的时候通过hexists判断key和value是否存在,存在则解锁;这里不会出现误解锁

性能、崩溃恢复和redis同步

如何提升分布式锁的性能?以每分钟执行多少次acquire/release操作作为性能指标,一方面通过增加redis实例可用降低响应延迟,另一方面,使用非阻塞模型,一次发送所有的命令,然后异步读取响应结果,这里假设客户端和redis之间的RTT差不多。

如果redis没用使用备份,redis重启后,那么会丢失锁,导致多个客户端都能获取到锁。通过AOF持久化可以缓解这个问题。redis key过期是unix时间戳,即便是redis重启,那么时间依然是前进的。但是,如果是断电呢?redis在启动后,可能就会丢失这个key(在写入或者还未写入磁盘时断电了,取决于fsync的配置),如果采用fsync=always,那么会极大影响性能。如何解决这个问题呢?可以让redis节点重启后,在一个TTL时间段内,对客户端不可用即可。

Redisson

redisson是在redis基础上实现的一套开源解决方案,不仅提供了一系列的分布式的java常用对象,还提供了许多分布式服务,宗旨是促进使用者对redis的关注分离,更多的关注业务逻辑的处理上。

redisson也对redlock做了一套实现,详细如下:

public static void main() {

        Config config1 = new Config();
        config1.useSingleServer().setAddress("redis://xxxx1:xxx1")
                .setPassword("xxxx1")
                .setDatabase(0);
        RedissonClient redissonClient1 = Redisson.create(config1);
        Config config2 = new Config();
        config2.useSingleServer()
                .setAddress("redis://xxxx2:xxx2")
                .setPassword("xxxx2")
                .setDatabase(0);

        RedissonClient redissonClient2 = Redisson.create(config2);
        Config config3 = new Config();
        config3.useSingleServer().
                setAddress("redis://xxxx3:xxx3")
                .setPassword("xxxx3")
                .setDatabase(0);

        RedissonClient redissonClient3 = Redisson.create(config3);

        String lockName = "redlock-test";
        RLock lock1 = redissonClient1.getLock(lockName);
        RLock lock2 = redissonClient2.getLock(lockName);
        RLock lock3 = redissonClient3.getLock(lockName);

        RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
        boolean isLock;
        try {
            /**
             * 尝试获取锁
             * waitTimeout 尝试获取锁的最大等待时间,超过这个值,则认为获取锁失败
             * leaseTime   锁的持有时间,超过这个时间锁会自动失效(值应设置为大于业务处理的时间,确保在锁有效期内业务能处理完)
             */
            isLock = redLock.tryLock(500, 30000, TimeUnit.MILLISECONDS);

            if (isLock) {
                // 获取锁成功,处理业务
                Thread.sleep(30000);
            }
        } catch (Exception e) {

        } finally {
            // 无论如何, 最后都要解锁
            redLock.unlock();

        }
    }

tryLock()源码:redisson对redlock的实现方式基本和上面文字描述的类似,有一点区别在于,redisson在获取锁成功后,会对key的失效时间重新赋值。

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long newLeaseTime = -1;
        if (leaseTime != -1) {
            newLeaseTime = unit.toMillis(waitTime)*2;
        }        
        long time = System.currentTimeMillis();
        long remainTime = -1;
        if (waitTime != -1) {
            remainTime = unit.toMillis(waitTime);
        }
        long lockWaitTime = calcLockWaitTime(remainTime);        
        int failedLocksLimit = failedLocksLimit();
        List<RLock> acquiredLocks = new ArrayList<RLock>(locks.size());
        for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
            RLock lock = iterator.next();
            boolean lockAcquired;
            try {
                if (waitTime == -1 && leaseTime == -1) {
                    lockAcquired = lock.tryLock();
                } else {
                    long awaitTime = Math.min(lockWaitTime, remainTime);
                    lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
                }
            } catch (RedisResponseTimeoutException e) {
                unlockInner(Arrays.asList(lock));
                lockAcquired = false;
            } catch (Exception e) {
                lockAcquired = false;
            }            
            if (lockAcquired) {
                acquiredLocks.add(lock);
            } else {
                if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
                    break;
                }
                if (failedLocksLimit == 0) {
                    unlockInner(acquiredLocks);
                    if (waitTime == -1 && leaseTime == -1) {
                        return false;
                    }
                    failedLocksLimit = failedLocksLimit();
                    acquiredLocks.clear();
                    // reset iterator
                    while (iterator.hasPrevious()) {
                        iterator.previous();
                    }
                } else {
                    failedLocksLimit--;
                }
            }            
            if (remainTime != -1) {
                remainTime -= (System.currentTimeMillis() - time);
                time = System.currentTimeMillis();
                if (remainTime <= 0) {
                    unlockInner(acquiredLocks);
                    return false;
                }
            }
        }
        if (leaseTime != -1) {
            List<RFuture<Boolean>> futures = new ArrayList<RFuture<Boolean>>(acquiredLocks.size());
            for (RLock rLock : acquiredLocks) {
                RFuture<Boolean> future = rLock.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
                futures.add(future);
            }            
            for (RFuture<Boolean> rFuture : futures) {
                rFuture.syncUninterruptibly();
            }
        }        
        return true;
    }

Reddsion还有基于lua脚本的实现这里也简单讲下:

总体流程简单来说:

1、线程A和线程B两个线程同时争抢锁。线程A很幸运,最先抢到了锁。线程B在获取锁失败后,并未放弃希望,而是主动订阅了解锁消息,然后再尝试获取锁,顺便看看没有抢到的这把锁还有多久就过期,线程B就按需阻塞等锁释放。

2、线程A拿着锁干完了活,自觉释放了持有的锁,于此同时广播了解锁消息,通知其他抢锁的线程再来枪;

3、解锁消息的监听者LockPubSub收到消息后,释放自己持有的信号量;线程B就瞬间从阻塞中被唤醒了,接着再抢锁,这次终于抢到锁了!后面再按部就班,干完活,解锁

网上有一张总体过程时序图不错【传送门】:

加锁与解锁都是通过lua脚本实现的:

-- 若锁不存在:则直接广播解锁消息,并返回1
if (redis.call('exists', KEYS[1]) == 0) then
    redis.call('publish', KEYS[2], ARGV[1]);
    return 1; 
end;
 
-- 若锁存在,但唯一标识不匹配:则表明锁被其他线程占用,当前线程不允许解锁其他线程持有的锁
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
    return nil;
end; 
 
-- 若锁存在,且唯一标识匹配:则先将锁重入计数减1
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); 
if (counter > 0) then 
    -- 锁重入计数减1后还大于0:表明当前线程持有的锁还有重入,不能进行锁删除操作,但可以友好地帮忙设置下过期时期
    redis.call('pexpire', KEYS[1], ARGV[2]); 
    return 0; 
else 
    -- 锁重入计数已为0:间接表明锁已释放了。直接删除掉锁,并广播解锁消息,去唤醒那些争抢过锁但还处于阻塞中的线程
    redis.call('del', KEYS[1]); 
    redis.call('publish', KEYS[2], ARGV[1]); 
    return 1;
end;
 
return nil;
-- 若锁不存在:则新增锁,并设置锁重入计数为1、设置锁过期时间
if (redis.call('exists', KEYS[1]) == 0) then
    redis.call('hset', KEYS[1], ARGV[2], 1);
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
end;
 
-- 若锁存在,且唯一标识也匹配:则表明当前加锁请求为锁重入请求,故锁重入计数+1,并再次设置锁过期时间
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
    redis.call('hincrby', KEYS[1], ARGV[2], 1);
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
end;
 
-- 若锁存在,但唯一标识不匹配:表明锁是被其他线程占用,当前线程无权解他人的锁,直接返回锁剩余过期时间
return redis.call('pttl', KEYS[1]);

加锁源码:

@Override
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        // 获取锁能容忍的最大等待时长
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        final long threadId = Thread.currentThread().getId(); 
        // 【核心点1】尝试获取锁,若返回值为null,则表示已获取到锁
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return true;
        } 
        // 还可以容忍的等待时长=获取锁能容忍的最大等待时长 - 执行完上述操作流逝的时间
        time -= (System.currentTimeMillis() - current);
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        } 
        current = System.currentTimeMillis();
        // 【核心点2】订阅解锁消息,见org.redisson.pubsub.LockPubSub#onMessage
        /**
     * 4.订阅锁释放事件,并通过await方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题:
     * 基于信息量,当锁被其它资源占用时,当前线程通过 Redis 的 channel 订阅锁的释放事件,一旦锁释放会发消息通知待等待的线程进行竞争
     * 当 this.await返回false,说明等待时间已经超出获取锁最大等待时间,取消订阅并返回获取锁失败
     * 当 this.await返回true,进入循环尝试获取锁
     */
        final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        //await 方法内部是用CountDownLatch来实现阻塞,获取subscribe异步执行的结果(应用了Netty 的 Future)
        if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
                    @Override
                    public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
                        if (subscribeFuture.isSuccess()) {
                            unsubscribe(subscribeFuture, threadId);
                        }
                    }
                });
            }
            acquireFailed(threadId);
            return false;
        }
         // 订阅成功
        try {
            // 还可以容忍的等待时长=获取锁能容忍的最大等待时长 - 执行完上述操作流逝的时间
            time -= (System.currentTimeMillis() - current);
            if (time <= 0) {
                // 超出可容忍的等待时长,直接返回获取锁失败
                acquireFailed(threadId);
                return false;
            }
            while (true) {
                long currentTime = System.currentTimeMillis();
                // 尝试获取锁;如果锁被其他线程占用,就返回锁剩余过期时间【同上】
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    return true;
                } 
                time -= (System.currentTimeMillis() - currentTime);
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                } 
                // waiting for message
                currentTime = System.currentTimeMillis(); 
                // 【核心点3】根据锁TTL,调整阻塞等待时长;
                // 注意:这里实现非常巧妙,1、latch其实是个信号量Semaphore,调用其tryAcquire方法会让当前线程阻塞一段时间,避免了在while循环中频繁请求获取锁;
               //2、该Semaphore的release方法,会在订阅解锁消息的监听器消息处理方法org.redisson.pubsub.LockPubSub#onMessage调用;当其他线程释放了占用的锁,会广播解锁消息,监听器接收解锁消息,并释放信号量,最终会唤醒阻塞在这里的线程。
                if (ttl >= 0 && ttl < time) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                } 
                time -= (System.currentTimeMillis() - currentTime);
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
            }
        } finally {
            // 取消解锁消息的订阅
            unsubscribe(subscribeFuture, threadId);
        }
    }

获取锁tryAcquire的实现,就是执行Lua脚本:

private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
    // tryAcquireAsync异步执行Lua脚本,get方法同步获取返回结果
    return get(tryAcquireAsync(leaseTime, unit, threadId));
}
 
//  见org.redisson.RedissonLock#tryAcquireAsync
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
    if (leaseTime != -1) {
        // 实质是异步执行加锁Lua脚本
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    ttlRemainingFuture.addListener(new FutureListener<Long>() {
        @Override
        public void operationComplete(Future<Long> future) throws Exception {
            //先判断这个异步操作有没有执行成功,如果没有成功,直接返回,如果执行成功了,就会同步获取结果
            if (!future.isSuccess()) {
                return;
            } 
            Long ttlRemaining = future.getNow();
            // lock acquired
            //如果ttlRemaining为null,则会执行一个定时调度的方法scheduleExpirationRenewal
            if (ttlRemaining == null) {
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
} 
// 见org.redisson.RedissonLock#tryLockInnerAsync
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
              "if (redis.call('exists', KEYS[1]) == 0) then " +
                  "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              "return redis.call('pttl', KEYS[1]);",
                Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

解锁:

@Override
    public void unlock() {
        // 执行解锁Lua脚本,这里传入线程id,是为了保证加锁和解锁是同一个线程,避免误解锁其他线程占有的锁
        Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));
        if (opStatus == null) {
            throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                    + id + " thread-id: " + Thread.currentThread().getId());
        }
        if (opStatus) {
            cancelExpirationRenewal();
        }
    }
 
// 见org.redisson.RedissonLock#unlockInnerAsync
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; " +
            "end;" +
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                "return nil;" +
            "end; " +
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
            "if (counter > 0) then " +
                "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                "return 0; " +
            "else " +
                "redis.call('del', KEYS[1]); " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; "+
            "end; " +
            "return nil;",
            Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
 
}

在解锁Lua脚本中,操作了两个key:一个是锁名my_lock_name,一个是解锁消息发布订阅频道redisson_lock__channel:{my_first_lock_name},按照上面slot计算方式,两个key都会按照内容my_first_lock_name来计算,故能保证落到同一个slot.

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言