在开发分布式系统中,分布式锁的应用是极其广泛的,以至于现在的校招都会问分布式锁的实现方案。大部分人都是通过网络上的碎片化知识拼凑出了分布式锁的相关方案,对其缘由、具体代码落地却没有深究,本文将从0开始带领读者一步一步地实现分布式锁,并阅读关键源码。
缘起
在谈分布式锁之前,首先需要了解本地锁。本地锁,顾名思义,这个锁对象只存在于本地应用中,比如我们可以使用诸如synchronized
或Lock
的实现类完成本地代码的加锁,保证高并发情况下的线程安全。
如下图展示了一个本地锁的使用场景,在扣减库存时首先需要判断剩余库存,若大于0才能进行扣减否则直接返回。若不加锁,那么在并发场景下可能会出现多扣(负数库存)或者少扣(极端下多次扣减却只减了1)的情况.在单体应用下我们可以使用本地锁实现线程安全,使得多个扣减操作顺序执行。
随着访问量的增加,部署单个应用已经无法满足我们的需求,此时我们可以部署多个应用,使用负载均衡算法分摊请求压力。
如果请求都在一个节点上本地锁依旧可以使用,不过那是不可能的,通常所有请求会被均匀的分发到每个节点(取决于负载均衡算法),而本地锁只能保证本地请求的顺序执行,无法保证所有请求的顺序执行,依旧会出现线程安全问题。
初见
如果我们能锁住所有相关请求呢?这不就保证了扣减库存时的安全了吗?恭喜你,你已经见到了分布式锁。
Key 钥匙
与其说实现分布式锁,不如说实现开锁的"钥匙",并且保证这把钥匙只能被一人得到。这把钥匙的具体形式我们可以随便定义,它可以是把钥匙、也可以是个口令、也可以是个密码,所以钥匙的实现方式可以有很多种,通常我们使用一个字符串表示钥匙。同时,为了加快钥匙的获取我们使用Redis存放这把"钥匙"。
首先需要声明的是,Redis中的对象我们是可以随意GET的,所以我们需要反其道而行之,如果我们能将对象SET进Redis中则表示获取到了锁,反之未获取到,释放锁则只需要删除对象即可。
分布式锁阶段一
上图所示的获取锁部分分为了两步,首先查询对象,然后设置对象。这不是一个原子性操作,可能会出现多个进程获取到锁的情况:
SETNX
幸运的是,Redis为我们提供了一个原子性命令——SETNX
。如果对象不存在,则设置Key。在这种情况下,它等于SET。但当Redis总已经存在Key时,不执行任何操作。
> 127.0.0.1@6379 connected!
> SETNX name catwinghu
1
> SETNX name catwinghu
0
基于SETNX
的分布式锁流程如下图所示:
Code
public class OrderService {
@Autowired
private RedisTemplate redisTemplate;
private final String KEY = "ORDER:LOCK";
/**
* 扣减库存
*/
public void deducting() throws InterruptedException {
// 1、占分布式锁
/**
* setIfAbsent: 如果不存在对应KEY才会SET,底层执行的是SETNX
* execute(connection -> connection.setNX(rawKey, rawValue), true)
*/
Boolean lock = redisTemplate.opsForValue().setIfAbsent(KEY, "EVERYTHING");
if(lock){
// SET 成功,表示获取到了锁
// TODO:业务逻辑代码
// 2、业务处理完成,释放锁
redisTemplate.delete(KEY);
}else{
// 3、加锁失败,重试获取锁--自旋
// 休眠一下再获取
Thread.sleep(1000);
deducting();
}
}
}
分布式锁阶段二
阶段一的分布式锁可能会导致死锁问题:
也许你会这样做:在获取到锁后设置过期时间。
Boolean lock = redisTemplate.opsForValue().setIfAbsent(KEY, "EVERYTHING");
if(lock){
// SET 成功,表示获取到了锁
// 设置过期时间,30s
redisTemplate.expire(KEY,30, TimeUnit.SECONDS);
// TODO:业务逻辑代码
// 业务处理完成,释放锁
redisTemplate.delete(KEY);
}
很可惜,阶段二的分布式锁依旧可能发生死锁:
分布式锁阶段三
不难发现,阶段二发生的问题是因为获取锁和设置过期时间不是原子性的,中间是可能被打断的。
回顾SET
虽然Redis提供了SETEX
命令,但是其无法实现类似SETNX
不能重复设置的功能。查看SET
命令会发现它支持添加额外选项:
- EX seconds:设置指定的过期时间,以秒为单位。
- PX 毫秒:设置指定的过期时间,以毫秒为单位。
- EXAT timestamp-seconds:设置密钥到期的指定 Unix 时间,以秒为单位。
- PXAT timestamp-milliseconds:设置指定的 Unix 时间,密钥将到期,以毫秒为单位。
- NX:仅在不存在的情况下设置密钥。
- XX:仅设置已存在的密钥。
- KEEPTTL:保留与密钥关联的生存时间。
- GET:返回存储在 key 中的旧字符串,如果 key 不存在,则返回 nil。如果 key 中存储的值不是字符串,则返回错误并中止SET。
SET
命令选项可以替换SETNX
、SETEX
、PSETEX
、GETSET
,因此在未来的 Redis 版本中,这些命令可能会被弃用并最终被删除。
基于SETNX+EX
的分布式锁流程如下图所示:
Code
// 1、占分布式锁,同时设置过期时间
Boolean lock = redisTemplate.opsForValue().setIfAbsent(KEY,"EVERYTHING",30,TimeUnit.SECONDS);
if(lock){
// SET 成功,表示获取到了锁
// 设置过期时间,30s
redisTemplate.expire(KEY,30, TimeUnit.SECONDS);
// TODO:业务逻辑代码
// 业务处理完成,释放锁
redisTemplate.delete(KEY);
}else{
// 加锁失败,重试获取锁--自旋
// 休眠一下再获取
Thread.sleep(1000);
deducting();
}
分布式锁阶段四
误杀
阶段三的分布式锁在锁自动过期后可能会导致误杀:
如何做到释放锁时分辨是否是自己的锁呢?我们可以利用VALUE,将自己的一个标记(TOKEN)设置进去,删除时判断是不是我获得的锁。
防止误杀的分布式锁流程图如下所示:
Code
public class OrderService {
@Autowired
private RedisTemplate redisTemplate;
private final String KEY = "ORDER:LOCK";
/**
* 扣减库存
*/
public void deducting() throws InterruptedException {
// 生成TOKEN
String TOKEN = UUID.randomUUID().toString();
// 1、占分布式锁,同时设置过期时间
Boolean lock = redisTemplate.opsForValue().setIfAbsent(KEY,TOKEN,30,TimeUnit.SECONDS);
if(lock){
// SET 成功,表示获取到了锁
// 设置过期时间,30s
redisTemplate.expire(KEY,30, TimeUnit.SECONDS);
// TODO:业务逻辑代码
// 业务处理完成,释放锁
// 首先获取锁的值
String value = ((String) redisTemplate.opsForValue().get(KEY));
// 判断是否和 TOKEN 相等
if(TOKEN.equals(value)){
// 相等表明是我的锁,可以删除
redisTemplate.delete(KEY);
}
// 不相等,不是我的锁,什么也不干
}else{
// 加锁失败,重试获取锁--自旋
// 休眠一下再获取
Thread.sleep(1000);
deducting();
}
}
}
分布式锁阶段五
到现在我们的分布式锁已经经历了四个版本的迭代,可是,很遗憾,阶段四的实现依旧存在误杀的问题。
为保证三步操作的原子性,我们需要使用LUA脚本完成。
if redis.call("get",KEYS[1]) == ARGV[1]
then
return redis.call("del",KEYS[1])
else
return 0
end
Code
public class OrderService {
@Autowired
private RedisTemplate redisTemplate;
private final String KEY = "ORDER:LOCK";
/**
* 扣减库存
*/
public void deducting() throws InterruptedException {
// 生成TOKEN
String TOKEN = UUID.randomUUID().toString();
// 1、占分布式锁,同时设置过期时间
Boolean lock = redisTemplate.opsForValue().setIfAbsent(KEY,TOKEN,30,TimeUnit.SECONDS);
if(lock){
try {
// SET 成功,表示获取到了锁
// TODO:业务逻辑代码
}finally {
// 业务处理完成,释放锁
// LUA脚本解锁
String lua = "if redis.call(\"get\",KEYS[1]) == ARGV[1]\n" +
"then\n" +
" return redis.call(\"del\",KEYS[1])\n" +
"else\n" +
" return 0\n" +
"end";
DefaultRedisScript<Integer> script = new DefaultRedisScript<>(lua,Integer.class);
// 删除锁
redisTemplate.execute(script, Arrays.asList(KEY), TOKEN);
}
}else{
// 加锁失败,重试获取锁--自旋
// 休眠一下再获取
Thread.sleep(1000);
deducting();
}
}
}
Redisson
Redis提出了一种称为 Redlock 的算法,并且不同语言对其有各种各样的实现方案,Java方面就是大名鼎鼎的Redisson。
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet
, Set
, Multimap
, SortedSet
, Map
, List
, Queue
, BlockingQueue
, Deque
, BlockingDeque
, Semaphore
, Lock
, AtomicLong
, CountDownLatch
, Publish / Subscribe
, Bloom filter
, Remote service
, Spring cache
, Executor service
, Live Object service
, Scheduler service
) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。
快速开始
引入依赖
<!--Redisson-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.16.1</version>
</dependency>
配置客户端
Redisson程序化的配置方法是通过构建Config对象实例来实现的。
@Configuration
public class RedissonConfig {
@Bean(destroyMethod = "shutdown")
RedissonClient redisson() throws IOException {
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
return Redisson.create(config);
}
}
小试牛刀
@RestController
@RequestMapping("/order")
public class OrderController {
@Autowired
private RedissonClient redisson;
private final String KEY = "ORDER:LOCK";
/**
* 扣减库存
*/
@GetMapping("/deduct")
public String deducting() throws InterruptedException {
StopWatch watch = new StopWatch();
watch.start();
// 获取锁
RLock lock = redisson.getLock(KEY);
// 加锁
lock.lock();
try {
// 加锁成功
// TODO:业务逻辑代码
Thread.sleep(10000);
} finally {
lock.unlock();
watch.stop();
}
return "Deeuct Success,Cost Time:"+watch.getTotalTimeSeconds();
}
}
我们可以看一下锁在Redis是什么样子的呢?
可以发现这个锁带过期时间(默认30s),value是一个随机字符串(85表示线程ID)。和我们上面的设计差不多。
自动续期——看门狗机制
我们自己设计的分布式锁中如果遇到超长执行时间的业务,锁会被自动删除,从而被其他进程获取,如果分布式锁的过期时间能自动续期就好了。Redisson为我们实现了这一功能。如下图展示了Redisson的一次锁续期操作(仅在lock()未指定过期时间时有效):
源码分析
- 如果我们传递了锁的超时时间,底层直接执行 LUA 脚本,到期删除即可
如果我们未指定锁的超时时间,就使用看门狗的默认时间
- 在一个定时任务中更新过期时间
- 定时任务间隔的时间是看门狗时间的1/3
Zookeeper版分布式锁
Zookeeper的分布式锁基于临时顺序节点:
- 客户端调用
create()
方法创建名为"locknode/child-lock-"的节点。需要注意的是,这里节点的创建类型需要设置为EPHEMERAL_SEQUENTIAL
。 - 客户端调用
getChildren("locknode")
方法来获取所有已经创建的子节点。 - 客户端获取到所有子节点path之后,如果发现自己在步骤1中创建的节点是所有节点中序号最小的,那么就认为这个客户端获得了锁。
- 如果创建的节点不是所有节点中序号最小的,那么则监视比自己创建节点的序列号小的最大的节点,进入等待。直到下次监视的子节点变更的时候,再进行子节点的获取,判断是否获取锁。
- 释放锁时删除自己创建的那个子节点即可。
Zookeeper创建分布式锁的流程图如下所示:
Code
public class ZKlock implements AutoCloseable, Watcher {
private ZooKeeper zooKeeper;
private String znode;
public ZKlock() throws IOException {
this.zooKeeper = new ZooKeeper("192.168.233.139:2181",30*1000,this);
}
public boolean getLock(String bizCode){
String path = "/"+bizCode;
try {
Stat stat = zooKeeper.exists(path, false);
if (stat == null) {
// 创建根节点
zooKeeper.create(path,bizCode.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
}
// 创建临时有序节点,如/order/order_+NO
znode = zooKeeper.create(path + path + "_", bizCode.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 获取根目录下所有临时节点
List<String> children = zooKeeper.getChildren(path, false);
// 按从小到大排序
Collections.sort(children);
// 获得最小的节点
String first = children.get(0);
if(znode.endsWith(first)){
// 当前node就是最小节点,获取锁成功
return true;
}
// 不是最小的节点,则监听前一个节点
String preNode = first;
// 前一个节点的索引
int preNodeIdx = -1;
for (String node : children) {
if(znode.endsWith(node)){
Stat watchStat = zooKeeper.exists(path + "/" + preNode, true);
if(watchStat != null){
// 监听上一个节点成功
break;
}
int temp = preNodeIdx;
Stat watchStat2 = null;
while(temp>=0){
watchStat2 = zooKeeper.exists(path + "/" + children.get(temp), true);
if(watchStat2 != null){
// 监听上一个节点成功
break;
}
// 继续向前找有效节点进行监听
temp--;
}
if(watchStat2 == null){
// 无上一个有效节点
return true;
}
break;
}else{
preNode = node;
preNodeIdx ++;
}
}
synchronized (this){
wait();
}
return true;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
@Override
public void close() throws Exception {
zooKeeper.delete(znode,-1);
}
@Override
public void process(WatchedEvent event) {
if(event.getType() == Event.EventType.NodeDeleted){
synchronized (this){
notify();
}
}
}
}
总结
Redis和ZK版本的对比
- redis分布式锁,需要自己不断去尝试获取锁,比较消耗性能
- zk分布式锁,获取不到锁,注册个监听器即可,不需要不断主动尝试获取锁,性能开销较小
- 如果redis获取锁的那个客户端宕机,只能等待超时时间之后才能释放锁;而因为Zookeeper创建的是临时znode,只要客户端挂了,znode就没了,此时就自动释放锁,并回调通知下一个节点
存在问题
在Redis为单机的情况下如果节点宕机了,整个分布式锁GG。我们虽然可以单机主从模式,但是MASTER-SLAVE之间的同步是异步进行的。如果进程A加上了锁,但是锁的内容没有同步至从节点,这时MASTER挂了,由于从节点没有锁的内容,使得进程B又能获取锁,分布式锁失效。
版权属于:带翅膀的猫
本文链接:https://chengpengper.cn/archives/166/
转载时须注明出处及本声明
redis分布式锁笔记:
这个目的是加锁
setifabsent(key,value,time,timeunit.seconds) //设置key的存活时间,原子语句
看锁是否能够加上,如果存在key值上面的方法返回false,对系统调用error
加锁执行业务逻辑
释放锁
为什么要加时间呢!finally执行不到呀,就会造成永久性锁。
带来一个问题 事务比较长的化,如15s的操作10s另一个线程进来进行加锁,再过5s 线程A将锁释放,之后C线程可以进来
刚加的锁永久失效!
解决方法
设置value值为uuid ,在释放锁的时候进行判断,如果此时加锁的value值与生成uuid相同,
代表的是自己释放了自己,而不是别人释放的。
这还有个问题,此时key过期了 ,别人可以拿到锁造成了死锁问题
解决方法
开启子线程不断的给对象延长加锁时间 (定时器或者while循环 时间设置为有原来的1/3)
可以使用redission解决。
实现步骤类似于上面
通过一个key值取锁,然后进行加锁过程,之后释放锁
线程1 执行Redisson 判断是否可以进行加锁,加锁就对资源业务处理,另外的线程cas等待
还有问题 主从复制情况下 A挂了,B当选,但是B没有同步过来
REDLock
基本思想就是多个master
请求一个master,设置自动失效时间还有超时时间,超时时间是为了防止master宕机了之后,客户端还在等待结果
服务器没有在规定时间相应,尝试去另一个redis实例请求获取锁。
客户端从当前时间-获取锁时间=得到锁的时间。且在redis实列超过n/2几点获取锁 与得到的所时间小于失效时间才算获取成功。
有效时间-获取锁的时间 = 锁使用的时间
锁失败,客户端应该在所有redis进行解锁。