JAVA面试宝典-分布式篇(二十二)分布式幂等性如何设计?

JAVA面试宝典-分布式篇(二十二)分布式幂等性如何设计?头条创作挑战赛 在高并发场景的架构里 幂等性是必须得保证的 比如说支付功能 用户发起支付 如果后台没有做幂等校验 刚好用户手抖多点了几下 于是后台就可能多次收到同一个订单请求 不做幂等很容易就让用户重复支付了 这样用户是肯定不能忍的

欢迎大家来到IT世界,在知识的湖畔探索吧!

#头条创作挑战赛#

在高并发场景的架构里,幂等性是必须得保证的。比如说支付功能,用户发起支付,如果后台没有做幂等校验,刚好用户手抖多点了几下,于是后台就可能多次收到同一个订单请求,不做幂等很容易就让用户重复支付了,这样用户是肯定不能忍的。

解决方案

1,查询和删除不在幂等讨论范围,查询肯定没有幂等的说,删除:第一次删除成功后,后面来删除直接返回0,也是返回成功。

2,建唯一索引:唯一索引或唯一组合索引来防止新增数据存在脏数据(当表存在唯一索引,并发现新增异常时,再查询一次就可以了,数据应该已经存在了,返回结果即可)。

3,token机制:由于重复点击或者网络重发,或者nginx重发等情况会导致数据被重复提交。前端在数据提交前要向后端服务的申请token,token放到 Redis 或 JVM 内存,token有效时间。提交后后台校验token,同时删除token,生成新的token返回。redis要用删除操作来判断token,删除成功代表token校验通过,如果用select+delete来校验token,存在并发问题,不建议使用。

4,悲观锁

悲观锁使用时一般伴随事务一起使用,数据锁定时间可能会很长,根据实际情况选用(另外还要考虑id是否为主键,如果id不是主键或者不是 InnoDB 存储引擎,那么就会出现锁全表)。

select id ,name from table_# where id='' for update;

欢迎大家来到IT世界,在知识的湖畔探索吧!

5,乐观锁,给数据库表增加一个version字段,可以通过这个字段来判断是否已经被修改了

欢迎大家来到IT世界,在知识的湖畔探索吧!update table_xxx set name=#name#,version=version+1 where version=#version#

6,分布式锁,比如 Redis、Zookeeper 的分布式锁。单号为key,然后给Key设置有效期(防止支付失败后,锁一直不释放),来一个请求使用订单号生成一把锁,业务代码执行完成后再释放锁。

7,保底方案,先查询是否存在此单,不存在进行支付,存在就直接返回支付结果。

最优方案:分布式锁


分布式锁方案对比。 基于Zookeeper 当使用Zookeeper实现分布式锁时,在加锁的共同父节点下创建一个新的临时有需节点。创建完成后会获取加锁父节点下所有子节点,判断自己是否为最小的一个。如果是则获得锁,进行执行加锁代码,执行完毕后删除当前临时节点。

如果判断自己不是最小的一个节点时,则获取比自己小的最近的那个节点,并对其设置被删除监听。当之前节点不存在时当前节点获得锁,执行加锁逻辑代码,当执行完毕后,当前节点自行删除。

JAVA面试宝典-分布式篇(二十二)分布式幂等性如何设计?



欢迎大家来到IT世界,在知识的湖畔探索吧!

实现核心代码:

package com.xiaohui.bean; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import org.springframework.stereotype.Component; import java.util.List; import java.util.TreeSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; public class ZkLock implements Lock { //zk 客户端对象 private ZooKeeper zk; //zk的目录结构 根节点 private String root ="/locks"; //锁名称 private String lockName; //当前线程创建的序列node private ThreadLocal<String> nodeId = new ThreadLocal<String>(); //用来同步等待zkClient连接到了服务端 private CountDownLatch connectedSignal = new CountDownLatch(1); //超时时间 private final static int sessionTimeout = 3000; private final static byte[] data = new byte[0]; public ZkLock(String config, String lockName) { this.lockName = lockName; try{ //创建连接 zk = new ZooKeeper(config, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent event) { if(event.getState() == Event.KeeperState.SyncConnected){ //将count值减1 connectedSignal.countDown(); } } }); //[等待zk客户端链接上服务器后再继续执行]调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行 connectedSignal.await(); Stat stat = zk.exists(root,false); if(null == stat){ //创建根节点 zk.create(root,data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);//持久的节点 } }catch (Exception e){ e.printStackTrace(); throw new RuntimeException(); } } @Override public void lock() { try{ //创建临时子节点[当前节点路径] String curNodePath = zk.create(root+"/"+lockName, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); System.out.println(Thread.currentThread().getName()+" "+curNodePath+ "--- created."); //取出所有子节点 List<String> children = zk.getChildren(root, false); TreeSet<String> set = new TreeSet<String>(); for (String child : children) { System.out.println(curNodePath+"----------------------------child:"+child); set.add(root+"/"+child); } String smallNode = set.first(); if(curNodePath.equals(smallNode)){ //如果是最小的节点,则表示获取到锁 System.out.println(Thread.currentThread().getName()+" "+root+"/"+lockName+ "--- 获得锁."); this.nodeId.set(curNodePath); return; } //============此处如果有延时,上一个节点 在此刻被删除,自己最小缺无法实现监听============== String preNode = set.lower(curNodePath); CountDownLatch latch = new CountDownLatch(1); Stat stat = zk.exists(preNode,new LockWatcher(latch));//注册监听 // 判断比自己小一个数的节点是否存在,如果不存在则无需等待解锁,同时注册监听 if(stat != null){ System.out.println(Thread.currentThread().getName()+" "+curNodePath+ "等待"+preNode +" 解锁"); latch.await();//此处等待。。。 nodeId.set(curNodePath); latch = null; }else{ System.out.println(Thread.currentThread().getName()+" "+curNodePath+"上一个节点不存在?我直接获取的锁"); nodeId.set(curNodePath); latch = null; } }catch (Exception e){ e.printStackTrace(); throw new RuntimeException(e); } } @Override public void lockInterruptibly() throws InterruptedException { } @Override public boolean tryLock() { return false; } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } @Override public void unlock() { try{ System.out.println(Thread.currentThread().getName()+" 解锁:"+nodeId.get()); if(null != nodeId){ zk.delete(nodeId.get(),-1); } nodeId.remove(); }catch (Exception e){ e.printStackTrace(); } } @Override public Condition newCondition() { return null; } //添加wacther 监听临时顺序节点的删除 class LockWatcher implements Watcher{ private CountDownLatch latch = null; public LockWatcher(CountDownLatch latch) { this.latch = latch; } @Override public void process(WatchedEvent event) { if(event.getType() == Event.EventType.NodeDeleted){ latch.countDown(); } } } }

同时下发10个请求,测试代码:

欢迎大家来到IT世界,在知识的湖畔探索吧!package com.xiaohui.web; import com.xiaohui.bean.Stock; import com.xiaohui.bean.ZkLock; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController public class WebController { ZkLock zkLock = new ZkLock("172.18.230.184:2181","stock_zk1"); @GetMapping("/startReduce") public String startReduce(){ new Thread(new Runnable() { @Override public void run() { zkLock.lock(); boolean b = Stock.reduceStock(); System.out.println(Thread.currentThread().getName()+"下单:"+( b ? "成功":"失败")); zkLock.unlock(); } }).start(); new Thread(new Runnable() { @Override public void run() { zkLock.lock(); boolean b = Stock.reduceStock(); System.out.println(Thread.currentThread().getName()+"下单:"+( b ? "成功":"失败")); zkLock.unlock(); } }).start(); new Thread(new Runnable() { @Override public void run() { zkLock.lock(); boolean b = Stock.reduceStock(); System.out.println(Thread.currentThread().getName()+"下单:"+( b ? "成功":"失败")); zkLock.unlock(); } }).start(); new Thread(new Runnable() { @Override public void run() { zkLock.lock(); boolean b = Stock.reduceStock(); System.out.println(Thread.currentThread().getName()+"下单:"+( b ? "成功":"失败")); zkLock.unlock(); } }).start(); new Thread(new Runnable() { @Override public void run() { zkLock.lock(); boolean b = Stock.reduceStock(); System.out.println(Thread.currentThread().getName()+"下单:"+( b ? "成功":"失败")); zkLock.unlock(); } }).start(); new Thread(new Runnable() { @Override public void run() { zkLock.lock(); boolean b = Stock.reduceStock(); System.out.println(Thread.currentThread().getName()+"下单:"+( b ? "成功":"失败")); zkLock.unlock(); } }).start(); new Thread(new Runnable() { @Override public void run() { zkLock.lock(); boolean b = Stock.reduceStock(); System.out.println(Thread.currentThread().getName()+"下单:"+( b ? "成功":"失败")); zkLock.unlock(); } }).start(); new Thread(new Runnable() { @Override public void run() { zkLock.lock(); boolean b = Stock.reduceStock(); System.out.println(Thread.currentThread().getName()+"下单:"+( b ? "成功":"失败")); zkLock.unlock(); } }).start(); try { Thread.sleep(1000); }catch (Exception e){ e.printStackTrace(); } System.out.println("Stock.count = " +Stock.count); return "set ok!"; } }

运行结果:

Thread-15 /locks/stock_zk--- created. Thread-16 /locks/stock_zk--- created. Thread-11 /locks/stock_zk--- created. Thread-17 /locks/stock_zk--- created. Thread-13 /locks/stock_zk--- created. Thread-10 /locks/stock_zk--- created. Thread-12 /locks/stock_zk--- created. Thread-14 /locks/stock_zk--- created. Thread-15 /locks/stock_zk1--- 获得锁. Thread-10 /locks/stock_zk等待/locks/stock_zk 解锁 Thread-17 /locks/stock_zk等待/locks/stock_zk 解锁 Thread-11 /locks/stock_zk等待/locks/stock_zk 解锁 Thread-16 /locks/stock_zk等待/locks/stock_zk 解锁 Thread-12 /locks/stock_zk等待/locks/stock_zk 解锁 Thread-13 /locks/stock_zk等待/locks/stock_zk 解锁 Thread-14 /locks/stock_zk等待/locks/stock_zk 解锁 Thread-15下单:成功 Thread-15 解锁:/locks/stock_zk Thread-16下单:失败 Thread-16 解锁:/locks/stock_zk Thread-11下单:失败 Thread-11 解锁:/locks/stock_zk Thread-17下单:失败 Thread-17 解锁:/locks/stock_zk Thread-12下单:失败 Thread-12 解锁:/locks/stock_zk Thread-13下单:失败 Thread-13 解锁:/locks/stock_zk Thread-10下单:失败 Thread-10 解锁:/locks/stock_zk Thread-14下单:失败 Thread-14 解锁:/locks/stock_zk Stock.count = 0

实现细节可能会因应用场景和需求而有所不同。另外,Zookeeper本身具有以下优缺点:

优点:1. Zookeeper的机制可以保证分布式锁的实现。 2. Zookeeper可以利用其自身的机制保证原子性。 3. Zookeeper可以用于实现消息的一致性。 缺点:1. Zookeeper的机制可以导致系统的性能下降。 2. Zookeeper可能会因为机制的问题出现脑裂的问题。 3. Zookeeper需要自己实现网络I/O,并且实现比较复杂。

同时使用此方案还需要单独维护一套Zookepper集群维护成本高

2.基于Redis的分布式锁

说到Redis分布式锁大部分人都会想到:

2.1 setnx+expire

【setnx和expire分2步可能出现死锁,误删除持有锁,不支持阻塞等待、重入】

2.2或者SET lock_name value EX seconds NX

【在分布式系统中,为了避免单点故障,提高可靠性,redis都会采用主从架构,当主节点挂了后,从节点会作为主继续提供服务。该种方案能够满足大多数的业务场景,但是对于要求强一致性的场景如交易,该种方案还是有漏洞的,原因如下:

redis主从架构采用的是异步复制,当master节点拿到了锁,但是锁还未同步到slave节点,此时master节点挂了,发生故障转移,slave节点被选举为master节点,丢失了锁。这样其他线程就能够获取到该锁,显然是有问题的。基于redis实现的分布式锁只是满足了AP《A(Availability):可用性,应该能够在正常时间内对请求进行响应;P(Partition-tolerance):分区容忍性,在分布式环境中,多个节点组成的网络应该是互相连通的,当由于网络故障等原因造成网络分区,要求仍然能够对外提供服务》,并没有满足C《(Consistency):一致性,在同一时间点,所有节点的数据都是完全一致的)》】

2.3这里着重讲基于RedLock的redission【传送门

RedLock是什么?

RedLock是基于redis实现的分布式锁,它能够保证以下特性:

互斥性:在任何时候,只能有一个客户端能够持有锁;

避免死锁:当客户端拿到锁后,即使发生了网络分区或者客户端宕机,也不会发生死锁;(利用key的存活时间)

容错性:只要多数节点的redis实例正常运行,就能够对外提供服务,加锁或者释放锁;

而非redLock是无法满足互斥性的,上面已经阐述过了原因。

RedLock算法

假设有N个redis的master节点,这些节点是相互独立的(不需要主从或者其他协调的系统)。N推荐为奇数~

客户端在获取锁时,需要做以下操作:

获取当前时间戳,以微妙为单为。

使用相同的lockName和lockValue,尝试从N个节点获取锁。(在获取锁时,要求等待获取锁的时间远小于锁的释放时间,如锁的lease_time为10s,那么wait_time应该为5-50毫秒;避免因为redis实例挂掉,客户端需要等待更长的时间才能返回,即需要让客户端能够fast_fail;如果一个redis实例不可用,那么需要继续从下个redis实例获取锁)

当从N个节点获取锁结束后,如果客户端能够从多数节点(N/2 + 1)中成功获取锁,且获取锁的时间小于失效时间,那么可认为,客户端成功获得了锁。(获取锁的时间=当前时间戳 – 步骤1的时间戳)

客户端成功获得锁后,那么锁的实际有效时间 = 设置锁的有效时间 – 获取锁的时间。

客户端获取锁失败后,N个节点的redis实例都会释放锁,即使未能加锁成功。

为什么N推荐为奇数呢?

原因1:本着最大容错的情况下,占用服务资源最少的原则,2N+1和2N+2的容灾能力是一样的,所以采用2N+1;比如,5台服务器允许2台宕机,容错性为2,6台服务器也只能允许2台宕机,容错性也是2,因为要求超过半数节点存活才OK。

原因2:假设有6个redis节点,client1和client2同时向redis实例获取同一个锁资源,那么可能发生的结果是——client1获得了3把锁,client2获得了3把锁,由于都没有超过半数,那么client1和client2获取锁都失败,对于奇数节点是不会存在这个问题。

失败时重试

当客户端无法获取到锁时,应该随机延时后进行重试,防止多个客户端在同一时间抢夺同一资源的锁(会导致脑裂,最终都不能获取到锁)。客户端获得超过半数节点的锁花费的时间越短,那么脑裂的概率就越低。所以,理想的情况下,客户端最好能够同时(并发)向所有redis发出set命令。

当客户端从多数节点获取锁失败时,应该尽快释放已经成功获取的锁,这样其他客户端不需要等待锁过期后再获取。(如果存在网络分区,客户端已经无法和redis进行通信,那么此时只能等待锁过期后自动释放)

释放锁

向所有redis实例发送释放锁命令即可,不需要关心redis实例有没有成功上锁。

redisson在加锁的时候,key=lockName, value=uuid + threadID,采用set结构存储,并包含了上锁的次数(支持可重入);解锁的时候通过hexists判断key和value是否存在,存在则解锁;这里不会出现误解锁

性能、崩溃恢复和redis同步

如何提升分布式锁的性能?以每分钟执行多少次acquire/release操作作为性能指标,一方面通过增加redis实例可用降低响应延迟,另一方面,使用非阻塞模型,一次发送所有的命令,然后异步读取响应结果,这里假设客户端和redis之间的RTT差不多。

如果redis没用使用备份,redis重启后,那么会丢失锁,导致多个客户端都能获取到锁。通过AOF持久化可以缓解这个问题。redis key过期是unix时间戳,即便是redis重启,那么时间依然是前进的。但是,如果是断电呢?redis在启动后,可能就会丢失这个key(在写入或者还未写入磁盘时断电了,取决于fsync的配置),如果采用fsync=always,那么会极大影响性能。如何解决这个问题呢?可以让redis节点重启后,在一个TTL时间段内,对客户端不可用即可。

Redisson

redisson是在redis基础上实现的一套开源解决方案,不仅提供了一系列的分布式的java常用对象,还提供了许多分布式服务,宗旨是促进使用者对redis的关注分离,更多的关注业务逻辑的处理上。

redisson也对redlock做了一套实现,详细如下:

欢迎大家来到IT世界,在知识的湖畔探索吧!public static void main() { Config config1 = new Config(); config1.useSingleServer().setAddress("redis://xxxx1:xxx1") .setPassword("xxxx1") .setDatabase(0); RedissonClient redissonClient1 = Redisson.create(config1); Config config2 = new Config(); config2.useSingleServer() .setAddress("redis://xxxx2:xxx2") .setPassword("xxxx2") .setDatabase(0); RedissonClient redissonClient2 = Redisson.create(config2); Config config3 = new Config(); config3.useSingleServer(). setAddress("redis://xxxx3:xxx3") .setPassword("xxxx3") .setDatabase(0); RedissonClient redissonClient3 = Redisson.create(config3); String lockName = "redlock-test"; RLock lock1 = redissonClient1.getLock(lockName); RLock lock2 = redissonClient2.getLock(lockName); RLock lock3 = redissonClient3.getLock(lockName); RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3); boolean isLock; try { / * 尝试获取锁 * waitTimeout 尝试获取锁的最大等待时间,超过这个值,则认为获取锁失败 * leaseTime 锁的持有时间,超过这个时间锁会自动失效(值应设置为大于业务处理的时间,确保在锁有效期内业务能处理完) */ isLock = redLock.tryLock(500, 30000, TimeUnit.MILLISECONDS); if (isLock) { // 获取锁成功,处理业务 Thread.sleep(30000); } } catch (Exception e) { } finally { // 无论如何, 最后都要解锁 redLock.unlock(); } }

tryLock()源码:redisson对redlock的实现方式基本和上面文字描述的类似,有一点区别在于,redisson在获取锁成功后,会对key的失效时间重新赋值。

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long newLeaseTime = -1; if (leaseTime != -1) { newLeaseTime = unit.toMillis(waitTime)*2; } long time = System.currentTimeMillis(); long remainTime = -1; if (waitTime != -1) { remainTime = unit.toMillis(waitTime); } long lockWaitTime = calcLockWaitTime(remainTime); int failedLocksLimit = failedLocksLimit(); List<RLock> acquiredLocks = new ArrayList<RLock>(locks.size()); for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) { RLock lock = iterator.next(); boolean lockAcquired; try { if (waitTime == -1 && leaseTime == -1) { lockAcquired = lock.tryLock(); } else { long awaitTime = Math.min(lockWaitTime, remainTime); lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS); } } catch (RedisResponseTimeoutException e) { unlockInner(Arrays.asList(lock)); lockAcquired = false; } catch (Exception e) { lockAcquired = false; } if (lockAcquired) { acquiredLocks.add(lock); } else { if (locks.size() - acquiredLocks.size() == failedLocksLimit()) { break; } if (failedLocksLimit == 0) { unlockInner(acquiredLocks); if (waitTime == -1 && leaseTime == -1) { return false; } failedLocksLimit = failedLocksLimit(); acquiredLocks.clear(); // reset iterator while (iterator.hasPrevious()) { iterator.previous(); } } else { failedLocksLimit--; } } if (remainTime != -1) { remainTime -= (System.currentTimeMillis() - time); time = System.currentTimeMillis(); if (remainTime <= 0) { unlockInner(acquiredLocks); return false; } } } if (leaseTime != -1) { List<RFuture<Boolean>> futures = new ArrayList<RFuture<Boolean>>(acquiredLocks.size()); for (RLock rLock : acquiredLocks) { RFuture<Boolean> future = rLock.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS); futures.add(future); } for (RFuture<Boolean> rFuture : futures) { rFuture.syncUninterruptibly(); } } return true; }

Reddsion还有基于lua脚本的实现这里也简单讲下:

总体流程简单来说:

1、线程A和线程B两个线程同时争抢锁。线程A很幸运,最先抢到了锁。线程B在获取锁失败后,并未放弃希望,而是主动订阅了解锁消息,然后再尝试获取锁,顺便看看没有抢到的这把锁还有多久就过期,线程B就按需阻塞等锁释放。

2、线程A拿着锁干完了活,自觉释放了持有的锁,于此同时广播了解锁消息,通知其他抢锁的线程再来枪;

3、解锁消息的监听者LockPubSub收到消息后,释放自己持有的信号量;线程B就瞬间从阻塞中被唤醒了,接着再抢锁,这次终于抢到锁了!后面再按部就班,干完活,解锁

网上有一张总体过程时序图不错【传送门】:

JAVA面试宝典-分布式篇(二十二)分布式幂等性如何设计?

加锁与解锁都是通过lua脚本实现的:

欢迎大家来到IT世界,在知识的湖畔探索吧!-- 若锁不存在:则直接广播解锁消息,并返回1 if (redis.call('exists', KEYS[1]) == 0) then redis.call('publish', KEYS[2], ARGV[1]); return 1; end; -- 若锁存在,但唯一标识不匹配:则表明锁被其他线程占用,当前线程不允许解锁其他线程持有的锁 if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil; end; -- 若锁存在,且唯一标识匹配:则先将锁重入计数减1 local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then -- 锁重入计数减1后还大于0:表明当前线程持有的锁还有重入,不能进行锁删除操作,但可以友好地帮忙设置下过期时期 redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else -- 锁重入计数已为0:间接表明锁已释放了。直接删除掉锁,并广播解锁消息,去唤醒那些争抢过锁但还处于阻塞中的线程 redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;
JAVA面试宝典-分布式篇(二十二)分布式幂等性如何设计?

-- 若锁不存在:则新增锁,并设置锁重入计数为1、设置锁过期时间 if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; -- 若锁存在,且唯一标识也匹配:则表明当前加锁请求为锁重入请求,故锁重入计数+1,并再次设置锁过期时间 if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; -- 若锁存在,但唯一标识不匹配:表明锁是被其他线程占用,当前线程无权解他人的锁,直接返回锁剩余过期时间 return redis.call('pttl', KEYS[1]);
JAVA面试宝典-分布式篇(二十二)分布式幂等性如何设计?

加锁源码:

欢迎大家来到IT世界,在知识的湖畔探索吧!@Override public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { // 获取锁能容忍的最大等待时长 long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); final long threadId = Thread.currentThread().getId(); // 【核心点1】尝试获取锁,若返回值为null,则表示已获取到锁 Long ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } // 还可以容忍的等待时长=获取锁能容忍的最大等待时长 - 执行完上述操作流逝的时间 time -= (System.currentTimeMillis() - current); if (time <= 0) { acquireFailed(threadId); return false; } current = System.currentTimeMillis(); // 【核心点2】订阅解锁消息,见org.redisson.pubsub.LockPubSub#onMessage / * 4.订阅锁释放事件,并通过await方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题: * 基于信息量,当锁被其它资源占用时,当前线程通过 Redis 的 channel 订阅锁的释放事件,一旦锁释放会发消息通知待等待的线程进行竞争 * 当 this.await返回false,说明等待时间已经超出获取锁最大等待时间,取消订阅并返回获取锁失败 * 当 this.await返回true,进入循环尝试获取锁 */ final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); //await 方法内部是用CountDownLatch来实现阻塞,获取subscribe异步执行的结果(应用了Netty 的 Future) if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false)) { subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() { @Override public void operationComplete(Future<RedissonLockEntry> future) throws Exception { if (subscribeFuture.isSuccess()) { unsubscribe(subscribeFuture, threadId); } } }); } acquireFailed(threadId); return false; } // 订阅成功 try { // 还可以容忍的等待时长=获取锁能容忍的最大等待时长 - 执行完上述操作流逝的时间 time -= (System.currentTimeMillis() - current); if (time <= 0) { // 超出可容忍的等待时长,直接返回获取锁失败 acquireFailed(threadId); return false; } while (true) { long currentTime = System.currentTimeMillis(); // 尝试获取锁;如果锁被其他线程占用,就返回锁剩余过期时间【同上】 ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } time -= (System.currentTimeMillis() - currentTime); if (time <= 0) { acquireFailed(threadId); return false; } // waiting for message currentTime = System.currentTimeMillis(); // 【核心点3】根据锁TTL,调整阻塞等待时长; // 注意:这里实现非常巧妙,1、latch其实是个信号量Semaphore,调用其tryAcquire方法会让当前线程阻塞一段时间,避免了在while循环中频繁请求获取锁; //2、该Semaphore的release方法,会在订阅解锁消息的监听器消息处理方法org.redisson.pubsub.LockPubSub#onMessage调用;当其他线程释放了占用的锁,会广播解锁消息,监听器接收解锁消息,并释放信号量,最终会唤醒阻塞在这里的线程。 if (ttl >= 0 && ttl < time) { getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= (System.currentTimeMillis() - currentTime); if (time <= 0) { acquireFailed(threadId); return false; } } } finally { // 取消解锁消息的订阅 unsubscribe(subscribeFuture, threadId); } }

获取锁tryAcquire的实现,就是执行Lua脚本:

private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) { // tryAcquireAsync异步执行Lua脚本,get方法同步获取返回结果 return get(tryAcquireAsync(leaseTime, unit, threadId)); } // 见org.redisson.RedissonLock#tryAcquireAsync private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) { if (leaseTime != -1) { // 实质是异步执行加锁Lua脚本 return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.addListener(new FutureListener<Long>() { @Override public void operationComplete(Future<Long> future) throws Exception { //先判断这个异步操作有没有执行成功,如果没有成功,直接返回,如果执行成功了,就会同步获取结果 if (!future.isSuccess()) { return; } Long ttlRemaining = future.getNow(); // lock acquired //如果ttlRemaining为null,则会执行一个定时调度的方法scheduleExpirationRenewal if (ttlRemaining == null) { scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; } // 见org.redisson.RedissonLock#tryLockInnerAsync <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }

解锁:

欢迎大家来到IT世界,在知识的湖畔探索吧!@Override public void unlock() { // 执行解锁Lua脚本,这里传入线程id,是为了保证加锁和解锁是同一个线程,避免误解锁其他线程占有的锁 Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId())); if (opStatus == null) { throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + Thread.currentThread().getId()); } if (opStatus) { cancelExpirationRenewal(); } } // 见org.redisson.RedissonLock#unlockInnerAsync protected RFuture<Boolean> unlockInnerAsync(long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end;" + "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; "+ "end; " + "return nil;", Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId)); }

在解锁Lua脚本中,操作了两个key:一个是锁名my_lock_name,一个是解锁消息发布订阅频道redisson_lock__channel:{my_first_lock_name},按照上面slot计算方式,两个key都会按照内容my_first_lock_name来计算,故能保证落到同一个slot.

JAVA面试宝典-分布式篇(二十二)分布式幂等性如何设计?

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://itzsg.com/126835.html

(0)
上一篇 38分钟前
下一篇 2025年 4月 13日 上午7:05

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

联系我们YX

mu99908888

在线咨询: 微信交谈

邮件:itzsgw@126.com

工作时间:时刻准备着!

关注微信