Spring boot 分布式锁 优化分布式锁
Spring boot 分布式锁 优化分布式锁
一:准备工作
1.配置文件
server:
port: 8080
servlet:
session:
timeout: 30m
spring:
application:
name: spring-boot-redis
cache:
# 使用了Spring Cache后,能指定spring.cache.type就手动指定一下,虽然它会自动去适配已有Cache的依赖,但先后顺序会对Redis使用有影响(JCache -> EhCache -> Redis -> Guava)
type: REDIS
redis:
host: 192.168.0.1
port: 6379
password: 123
# 连接超时时间(ms)
timeout: 10000
# Redis默认情况下有16个分片,这里配置具体使用的分片,默认是0
database: 0
lettuce:
pool:
# 连接池最大连接数(使用负值表示没有限制) 默认 8
max-active: 100
# 连接池最大阻塞等待时间(使用负值表示没有限制) 默认 -1
max-wait: -1
# 连接池中的最大空闲连接 默认 8
max-idle: 8
# 连接池中的最小空闲连接 默认 0
min-idle: 0
2.pox文件
//为了方便测试 使用了web-starter
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
//以下三个必须
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.session</groupId>
<artifactId>spring-session-data-redis</artifactId>
</dependency>
二:测试
1.contoller层代码
分布式锁测试控制层
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author WJL
* @version 1.0
* @Date: 2022/8/20 23:18
* @功能描述
**/
@RestController
public class RedisTestController {
@Autowired
private RedisTemplate redisTemplate;
@GetMapping("/testLock")
public void testLock(){
//1获取锁,setne
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111",3, TimeUnit.SECONDS);
//2获取锁成功、查询num的值
if(lock){
Object value = redisTemplate.opsForValue().get("num");
//2.1判断num为空return
if(StringUtils.isEmpty(value)){
return;
}
//2.2有值就转成成int
int num = Integer.parseInt(value+"");
//2.3把redis的num加1
redisTemplate.opsForValue().set("num", ++num);
//2.4释放锁,del
redisTemplate.delete("lock");
}else{
//3获取锁失败、每隔0.1秒再获取
try {
Thread.sleep(100);
testLock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
要先在服务器的redis中 set num “0”
2.服务区测试效果
使用 ab 测试工具:httpd-tools(yum install -y httpd-tools)
ab -n(一次发送的请求数) -c(请求的并发数) 访问路径
测试如下:5000请求,100并发
ab -n 5000 -c 100 http://127.0.0.1:8206/testLock
可以看到此时的num变成1000
三:小结 误删
1.代码执行的流程梳理 例如 A先操作 1.上锁 2.具体操作 。假设服务器在A操作的时候卡顿,此时的时间已经过了设置的过期时间 3.自动释放key B抢到锁 1.进行上锁 2.进行具体操作 当A服务器反应过来的时候 继续进行操作,需要手动释放锁,但此时的锁在B当中,此时B操作没有完成就被释放B的锁!
2.解决方案 使用UUID防止误删 ,用uuid来标识不同的操作
四:优化 uuid来标识不同的操作
主要是在生成锁的时候加上uuid
在释放锁的时候进行一个uuid的校验
String uuid= UUID.randomUUID().toString();
//1获取锁,setne
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid , 3, TimeUnit.SECONDS);
//2.4释放锁,del
//判断比较uuid值是否一样
if(uuid.equals(redisTemplate.opsForValue().get("lock"))){
redisTemplate.delete("lock");
}
五:使用lua脚本继续优化
删除操作具有原子性
问题:A会释放B的锁
解决的方案
有一定的原子性,不会被其他命令插队,可以完成一些redis事务性的操作
@GetMapping("testLockLua")
public void testLockLua() {
//1 声明一个uuid ,将做为一个value 放入我们的key所对应的值中
String uuid = UUID.randomUUID().toString();
//2 定义一个锁:lua 脚本可以使用同一把锁,来实现删除!
String skuId = "25"; // 访问skuId 为25号的商品 100008348542
String locKey = "lock:" + skuId; // 锁住的是每个商品的数据
// 3 获取锁
Boolean lock = redisTemplate.opsForValue().setIfAbsent(locKey, uuid, 3, TimeUnit.SECONDS);
// 第一种: lock 与过期时间中间不写任何的代码。
// redisTemplate.expire("lock",10, TimeUnit.SECONDS);//设置过期时间
// 如果true
if (lock) {
// 执行的业务逻辑开始
// 获取缓存中的num 数据
Object value = redisTemplate.opsForValue().get("num");
// 如果是空直接返回
if (StringUtils.isEmpty(value)) {
return;
}
// 不是空 如果说在这出现了异常! 那么delete 就删除失败! 也就是说锁永远存在!
int num = Integer.parseInt(value + "");
// 使num 每次+1 放入缓存
redisTemplate.opsForValue().set("num", String.valueOf(++num));
/*使用lua脚本来锁*/
// 定义lua 脚本
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
// 使用redis执行lua执行
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(script);
// 设置一下返回值类型 为Long
// 因为删除判断的时候,返回的0,给其封装为数据类型。如果不封装那么默认返回String 类型,
// 那么返回字符串与0 会有发生错误。
redisScript.setResultType(Long.class);
// 第一个要是script 脚本 ,第二个需要判断的key,第三个就是key所对应的值。
redisTemplate.execute(redisScript, Arrays.asList(locKey), uuid);
} else {
// 其他线程等待
try {
// 睡眠
Thread.sleep(1000);
// 睡醒了之后,调用方法。
testLockLua();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
六:总结
为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:
- 互斥性。在任意时刻,只有一个客户端能持有锁。
- 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
- 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。
- 加锁和解锁必须具有原子性。
还没有评论,来说两句吧...