浅谈分布式锁
文章目录
- 前言
- 一、分布式锁?
- 二、实现思路
- 1.存储一个唯一的键
- 2.使用setnx
- 3.给键设置唯一值
- 4.因为超时导致的并发执行
- 4.Redission
- 5.代码实现
- 总结
前言
最近学习了分布式锁,本篇来记录一下
一、分布式锁?
火车站售票的案例,几乎每个学过线程的人都知道,在单体项目中。如果不对某些共享资源加锁,在并发场景下,会出现超卖,或者一张票多次出售的情况。一般我们的解决方案是通过加synchronized关键字来实现锁,或者使用reentrantlock来避免这种情况的发生。
然而,在分布式架构下,一个售票功能可能由多台服务器承载,那么上边说的两种锁都是基于tomcat实现的,我们知道,一个tomcat中的锁无论如何都不可能影响到另外一台服务器中的线程,所以当分布式架构下,遇到相同问题时,显然使用老办法是行不通的。必须引入另外一种概念,即:分布式锁
分布式锁:
首先我们需要考虑的问题是,分布式锁必须要实现的就是分布式架构下,多台服务器之间的数据一致性,这样我们不得不联想到Redis,分布式架构下,Redis可以做到多台服务器之间的数据同步,那么我们可以利用这一特性实现分布式锁。
二、实现思路
1.存储一个唯一的键
我们可以在关键代码块(售卖)之前,先将一个同名Key存入Redis中,代表当前线程获得了锁,当下一个线程进来时,先判断Key是否存在?存在则获取锁失败,需要等待线程一执行完毕并使用del命令删除key来释放锁,不存在则证明当前没有线程执行关键代码块,则存一个Key,然后执行。
需要注意的是:此时可能会出现一个问题,假设当第一个线程执行到一半时,出现了异常,由于我们加锁的这个过程是非原子性的,可能会导致该线程直接G掉,无法释放当前锁,那么后续线程就会陷入无休止的自旋等待,即死锁,怎么解决这个问题呢?
2.使用setnx
Redis的sennx命令有一个可以设置过期时间的机制,这种机制可以帮我们解决当线程G掉之后无法释放持有锁的情况,当线程执行时间超过我们预设置的时间后,自动释放锁,来避免死锁的情况
Redis 2.6.12 之后 Redis 支持 nx 和 ex 操作是同一原子操作。
但是这样我们并不是高枕无忧
想象一下,假设现在A线程获取到了锁并执行,key设置的过期时间是30秒,但是由于网络波动,导致线程A执行的时间超过了30秒,此时key自动失效,即锁被释放,此时一直在外等着的线程B进来了,自动设置了key,但是此时A恰好执行完毕,结果将B的锁删除了,当线程C看到Redis中已经没了key,那么它会直接进入执行,这样还是会产生不可预期的事件,显然是不合适的。
那么这个问题怎么解决?
3.给键设置唯一值
解决办法就是,当线程获取锁的时候(给Redis中存key),同步设置一个唯一的value值,推荐使用uuid,当线程A执行完之后,会根据value值判断当前的锁是不是自己的锁,再进行删除操作。
4.因为超时导致的并发执行
上边我们说到当key的时间设置固定30秒时,当网络波动造成线程A没有执行完毕,此时锁自动释放,导致B获得了锁,A和B发生并发执行,显然这种情况不符合我们的需求,那么怎么去解决这种问题?
一般有两种解决办法:
- 设置足够长的过期时间,但是这种方法显然不太合理,因为线程并发不可能一直等待,这样的效率十分的低下。
- 第二种方法,使用Redission
4.Redission
Redisson是一个基于NIO的Netty框架的企业级的开源Redis Client,也提供了分布式锁的支持,Redisson 是架设在 Redis 基础上的一个 Java 驻内存数据网格框架, 充分利用 Redis 键值数据库提供的一系列优势, 基于 Java 实用工具包中常用接口, 为使用者提供了 一系列具有分布式特性的常用工具类.
使用Redission的优点在于:
Redisson 的宗旨是促进使用者对 Redis 的关注分离,从而让使用者能够将精力更集中地放在处理业务逻辑上。
Redission中有一个watchdog的概念,翻译过来就是看门狗,它会在你获取锁之后,每隔10秒帮你把key的超时时间设为30s,这样的话,就算一直持有锁也不会出现key过期了,也解决了其他线程获取到锁的问题了。
redisson的“看门狗”逻辑保证了没有死锁发生,如果机器宕机了,看门狗也就没了。此时就不会延长key的过期时间,到了30s之后就会自动过期了,其他线程可以获取到锁
5.代码实现
引入依赖:
<!-- redisson -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.16.7</version>
</dependency>
添加配置类:
package com.lzl.config;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient(){
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379").setDatabase(0);
RedissonClient redissonClient = Redisson.create(config);
return redissonClient;
}
}
模拟业务代码:
package com.lzl.controller;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("secondKill")
public class SecondKillController {
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Autowired
private RedissonClient redissonClient;
//模拟减库存的案例
@RequestMapping("stock")
public String stock(){
//以商品的id作为key,手动去redis中添加数量
String stockKey = "product001";
//获取锁对象
RLock rLock = redissonClient.getLock("lockKey");
try {
//加锁
rLock.lock();
//boolean tryLock(long waitTime, long leaseTime, TimeUnit unit)
//lock(long leaseTime, TimeUnit unit)
//获取库存数量
String product001 = stringRedisTemplate.opsForValue().get(stockKey);
//转为Integer类型
Integer product001Stock = Integer.parseInt(product001);
//判断
if(product001Stock > 0){
//在原来库存的基础上减 1 ,获取真实的库存
int realProduct001Stock = product001Stock -1;
//重新再设置一下库存
stringRedisTemplate.opsForValue().set(stockKey,realProduct001Stock+"");
//提示语句
System.out.println("下单成功,库存量剩余:"+ realProduct001Stock);
}else{
//提示语句
System.out.println("下单失败,当前商品卖完了");
}
//测试锁续命,去redis中查询名为:"lockKey" 的key的过期时间
//Thread.sleep(30000);
}catch (Exception e){
//处理异常
}finally {
//释放锁
rLock.unlock();
}
return "success";
}
}
总结
本篇记录一下分布式锁的场景
还没有评论,来说两句吧...