redis与分布式锁浅谈
redis与分布式锁与缓存数据一致性问题浅谈
1.高并发下缓存失效问题
1.1 缓存穿透:
缓存穿透:
指查询一个一定不存在的数据,由于缓存是不命中,将去查询数据库,但是数据库也无此记录,我们没有将这次查询的null写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义风险:
利用不存在的数据进行攻击,数据库瞬时压力增大,最终导致崩溃解决:
null结果缓存,并加入短暂过期时间
1.2 缓存雪崩
缓存雪崩:
缓存雪崩是指在我们设置缓存时key采用了相同的过期时间,导致缓存在某一时刻同时失效,请求全部转发到DB,DB瞬时压力过重雪崩风险:
DB瞬时压力过重雪崩解决:
原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件
1.3 缓存击穿
缓存击穿:
对于一些设置了过期时间的key,如果这些key可能会在某些时间点被超高并发地访问,是一种非常“热点”的数据。如果这个key在大量请求同时进来前正好失效,那么所有对这个key的数据查询都落到db,我们称为缓存击穿。风险:
数据库瞬时压力增大,最终导致崩溃解决:
加锁大量并发只让一个去查,其他人等待,查到以后释放锁,其他人获取到锁,先查缓存,就会有数据,不用去db
2.分布式锁
分布式锁需要解决的问题:
问题1:
setnx占好了位,业务代码异常或者程序在页面过程中宕机,没有执行删除锁逻辑,这就造成了死锁解决:
设置锁的自动过期,即使没有删除,会自动删除
问题2:
setnx设置好,正要去设置过期时间,宕机。又死锁了。解决:
设置过期时间和占位必须是原子的。redis支持使用setnx ex命令
问题3:
删除锁直接删除??? 如果由于业务时间很长,锁自己过期了,我们直接删除,有可能把别人正在持有的锁删除了。解决:
占锁的时候,值指定为uuid,每个人匹配是自己的锁才删除
问题4:
如果正好判断是当前值,正要删除锁的时候,锁已经过期, 别人已经设置到了新的值。那么我们删除的是别人的锁解决:
删除锁必须保证原子性。使用redis+Lua脚本完成。 String script = “if redis.call(‘get’,
KEYS[1]) == ARGV[1] then return redis.call(‘del’, KEYS[1]) else return 0 end”;
保证加锁【占位+过期时间】和删除锁【判断+删除】的原子性。 更难的事情,锁的自动续期背景:最近遇到一个生产问题,分布式部署了十几条服务器,有个业务过期的定时任务会每天发邮件提醒用户,然而用户最近反馈,每天收到好几封提醒邮件,于是排查多发的原因。这个分布式锁很重要,大概率是没锁住
2.1 RedisTemplate(或stringRedisTemplate) 实现(方法一)
public void doSendEmail() {
//1.生成随机数
String uuid = UUID.randomUUID().toString();
//2.设置分布式锁,并设置过期时间120秒
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock_grant_expire_notice", uuid, 120, TimeUnit.SECONDS);
//3.获取到锁
try {
if (lock) {
//4.抢到锁把计数归零 RECURSIVE_CALL_TIMES 是定义在类下面的常量 private int RECURSIVE_CALL_TIMES = 0;
RECURSIVE_CALL_TIMES = NumberUtils.INTEGER_ZERO;
//5.获取邮件是否已发送标识
Object isSendFlag = redisTemplate.opsForValue().get("mail_is_send_flag");
//6.没有值,就是未发送邮件
if (isSendFlag == null || StringUtils.isEmpty(isSendFlag)) {
//加锁成功...执行业务
//7.发送邮件的真正业务
//sendMail();
redisTemplate.opsForValue().set("mail_is_send_flag", UUID.randomUUID().toString(), 23 * 60 * 60, TimeUnit.SECONDS);
} else {
//8.邮件已发送,无需重复发送
log.info("mail already send,there's no need to send it twice");
}
} else {
//9.未获取到分布式锁,尝试自璇,每10秒递归调用一次,尝试获取分布式锁,最多尝试5次
Thread.sleep(10000);
if (RECURSIVE_CALL_TIMES <= 5) {
RECURSIVE_CALL_TIMES += NumberUtils.INTEGER_ONE;
doSendEmail();
}
}
} catch (Exception e) {
log.error("execute send mail fail,message:" + e);
}finally{
//10.释放分布式锁,对比uuid值是为了只删除自己的锁,且对比值和删锁是原子操作
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList("lock_grant_expire_notice"), uuid);
}
}
2.2 JedisCluster 实现(方法二)
Jedis初始化类,连接redis,以及一些常用的方法
@Component("redisClusterConfig")
public class RedisClusterConfig {
private static Log log = LogFactory.getLog(RedisClusterConfig.class);
private volatile JedisCluster jedisCluster;
public RedisClusterConfig() {
initCluster();
}
public JedisCluster getClusterResource() {
initCluster();
return jedisCluster;
}
private void initCluster() {
if (null != jedisCluster) {
return;
}
synchronized (this) {
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
//redis的相关配置文件(把redis的相关信息配置在一个文件中,读取文件)
Properties properties = PropertyUtils.loadProperty("redis-context.properties");
jedisPoolConfig.setMaxTotal(PropertyUtils.getIntegerProperty(properties, "redis.pool.maxTotal", 50));
jedisPoolConfig.setMaxIdle(PropertyUtils.getIntegerProperty(properties, "redis.pool.maxIdle", 20));
jedisPoolConfig.setMinIdle(PropertyUtils.getIntegerProperty(properties, "redis.pool.minIdle", 10));
jedisPoolConfig.setMaxWaitMillis(PropertyUtils.getIntegerProperty(properties, "redis.pool.maxWaitMillis", 10000));
jedisPoolConfig.setTimeBetweenEvictionRunsMillis(PropertyUtils.getIntegerProperty(properties, "redis.pool.timeBetweenEvictionRunsMills", 60000));
jedisPoolConfig.setTestOnBorrow(false);
jedisPoolConfig.setTestOnReturn(false);
jedisPoolConfig.setTestWhileIdle(true);
String clusterHost = properties.getProperty("redis.pool.cluster.host");
int timeout = PropertyUtils.getIntegerProperty(properties, "redis.timeout", 2000);
int sockettime = PropertyUtils.getIntegerProperty(properties, "redis.pool.cluster.sockettimeout", 2000);
int maxAttempts = PropertyUtils.getIntegerProperty(properties, "redis.pool.cluster.maxAttempts", 2000);
String password = properties.getProperty("redis.pool.cluster.password");
Set<HostAndPort> nodes = new HashSet<>();
String[] hosts = clusterHost.split(",");
for (String ipPort : hosts) {
String[] ipPortArr = ipPort.split(":");
String ip = ipPortArr[0];
int port = Integer.parseInt(ipPortArr[1]);
nodes.add(new HostAndPort(ip, port));
}
jedisCluster = new JedisCluster(nodes, timeout, sockettime, maxAttempts, password, jedisPoolConfig);
}
}
@Override
protected void finalize() throws Throwable {
super.finalize();
if (jedisCluster != null) {
jedisCluster.close();
}
}
/**
* 获取分布式锁
*
* @param lockKey
* @param value
* @param expireTime
* @return
*/
public boolean getLock(String lockKey, String value, int expireTime) {
String LOCK_SUCCESS = "OK";
boolean clusterRtnValue = false;
lockKey = replace4set(lockKey);
JedisCluster clusterResource = getClusterResource();
try {
String result = clusterResource.set(lockKey, value, "NX", "EX", expireTime);
if (LOCK_SUCCESS.equalsIgnoreCase(result)) {
clusterRtnValue = true;
}
} catch (Exception e) {
log.error("exception:" + e);
}
return clusterRtnValue;
}
public boolean releaseLock(String lockKey, String value) {
Long RELEASE_SUCCESS = 1L;
boolean clusterRtnValue = false;
lockKey = replace4set(lockKey);
JedisCluster clusterResource = getClusterResource();
try {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Object result = clusterResource.eval(script, Collections.singletonList(lockKey), Collections.singletonList(value));
if (RELEASE_SUCCESS.equals(result)) {
clusterRtnValue = true;
}
} catch (Exception e) {
log.error("exception:" + e);
}
return clusterRtnValue;
}
public String set(String key, String value) {
String clusterRtnValue = null;
JedisCluster clusterResource = getClusterResource();
try {
clusterRtnValue = clusterResource.set(key, value);
} catch (Exception e) {
log.error("exception:" + e);
}
return clusterRtnValue;
}
public String setEx(String key, String value, int seconds) {
String clusterRtValue = null;
key = replace4set(key);
JedisCluster clusterResource = getClusterResource();
try {
clusterRtValue = clusterResource.setex(key, seconds, value);
} catch (Exception e) {
log.error("exception:" + e);
}
return clusterRtValue;
}
public String get(String key) {
String clusterRtValue = null;
key = replace4set(key);
JedisCluster clusterResource = getClusterResource();
try {
clusterRtValue = clusterResource.get(key);
} catch (Exception e) {
log.error("exception:" + e);
}
return clusterRtValue;
}
private String replace4set(String str) {
return str.replaceAll("\\{", "[").replaceAll("}", "]");
}
}
redis-context.properties配置内容
redis.pool.maxTotal=50
redis.pool.maxIdle=20
redis.pool.minIdle=10
redis.pool.maxWaitMillis=10000
redis.pool.timeBetweenEvictionRunsMills=60000
redis.pool.cluster.host=192.168.10.128:6379,192.168.10.131:6379
redis.timeout=2000
redis.pool.cluster.sockettimeout=2000
redis.pool.cluster.maxAttempts=2000
redis.pool.cluster.password=420188
分布式锁发送邮件
public void doSendEmail() {
//1.生成随机数
String redisLockValue = UUID.randomUUID().toString();
//2.设置分布式锁,并设置过期时间120秒
boolean lock = redisClusterConfig.getLock("lock_grant_expire_notice", redisLockValue, 120);
try {
if (lock) {
//抢到锁把计数归零
RECURSIVE_CALL_TIMES = NumberUtils.INTEGER_ZERO;
//获取邮件是否已发送标识
String isSendFlag = redisClusterConfig.get("mail_is_send_flag");
//未发送
if (StringUtils.isBlank(isSendFlag)) {
//发送邮件的真正业务
//sendMail();
redisClusterConfig.setEx("mail_is_send_flag", UUID.randomUUID().toString(), 23 * 60 * 60);
} else {
//已发送
log.info("mail already send,there's no need to send it twice");
}
} else {
//未获取到分布式锁
Thread.sleep(10000);
if (RECURSIVE_CALL_TIMES <= 5) {
RECURSIVE_CALL_TIMES += NumberUtils.INTEGER_ONE;
doSendEmail();
}
}
} catch (Exception e) {
log.error("execute send mail fail,message:" + e);
}finally{
//释放分布式锁
redisClusterConfig.releaseLock("lock_grant_expire_notice", redisLockValue);
}
}
2.3 Redisson 实现(方法三)
以上两种方法都差不多,但无法解决redis续期问题,如果业务执行时间超过了分布式锁的过期时间,会有问题。当然 把分布式锁时间设置稍长一点一般也没什么大问题。redisson在业务未执行完会自动续期
public void doSendEmail() {
//创建分布式锁
RLock lock = redisson.getLock("lock_grant_expire_notice");
try {
//获取分布式锁(参数1:等待时间,参数2:过期时间 参数3:时间单位)
if (lock.tryLock(0, 120000, TimeUnit.MILLISECONDS)) {
//抢到锁把计数归零
RECURSIVE_CALL_TIMES = NumberUtils.INTEGER_ZERO;
//获取邮件是否已发送标识
String isSendFlag = redisClusterConfig.get("mail_is_send_flag");
//未发送
if (StringUtils.isBlank(isSendFlag)) {
//发送邮件的真正业务
//sendMail();
redisClusterConfig.setEx("mail_is_send_flag", UUID.randomUUID().toString(), 23 * 60 * 60);
} else {
//已发送
log.info("mail already send,there's no need to send it twice");
}
} else {
//未获取到分布式锁
Thread.sleep(10000);
if (RECURSIVE_CALL_TIMES <= 5) {
RECURSIVE_CALL_TIMES += NumberUtils.INTEGER_ONE;
doSendEmail();
}
}
} catch (Exception e) {
log.error("execute send mail fail,message:" + e);
}finally{
//释放分布式锁
lock.unlock();
}
}
3.本地缓存Caffeine
Caffeine基于LRU算法实现,支持多种缓存过期策略。本地缓存且能设置过期时间。Caffeine 2.X版本基于jdk1.8,如果使用Caffeine 3.X版本则需要使用jdk 11版本
3.1 引包
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>2.9.3</version>
</dependency>
3.2 创建缓存
// 初始化缓存,设置了 1 分钟的写过期,100 的缓存最大个数
Cache<Integer, Integer> cache = Caffeine.newBuilder()
.expireAfterWrite(1, TimeUnit.MINUTES)
.maximumSize(100)
.build();
3.3 使用
int key = 1;
// 使用 getIfPresent 方法从缓存中获取值。如果缓存中不存指定的值,则方法将返回 null
System.out.println("不存在值,返回null:" + cache.getIfPresent(key));
// 也可以使用 get 方法获取值,该方法将一个参数为 key 的 Function 作为参数传入。
// 如果缓存中不存在该 key 则该函数将用于提供默认值,该值在计算后插入缓存中:
System.out.println("返回默认值:" + cache.get(key, a -> 2));
// 校验 key 对应的 value 是否插入缓存中
System.out.println("返回key对应的value:" + cache.getIfPresent(key));
// 手动 put 数据填充缓存中
int value = 2;
cache.put(key, value);
// 使用 getIfPresent 方法从缓存中获取值。如果缓存中不存指定的值,则方法将返回 null
System.out.println("返回key对应的value:" + cache.getIfPresent(key));
// 移除数据,让数据失效
cache.invalidate(key);
System.out.println("返回key对应的value:" + cache.getIfPresent(key));
4.死锁
特点:
1.互斥:
一个共享资源同一时间只能被一个线程占用;2.占有且等待:
线程1已经获得共享资源X,在等待获取共享资源Y的时候,它不会释放已经占有的共享资源X;3.不可抢占
:线程不能抢占其它线程已经占有的共享资源;4.循环等待:
线程1等待线程2占有的资源,线程2等待线程1占有的资源,这就是循环等待。
解决死锁:
破坏占有且等待条件
破坏不可抢占条件 ReentrantLock的unlock()方法来释放自己持有的锁
破坏循环等待条件
定时检测死锁 jstack 工具是一个线程堆栈分析工具
5.缓存数据一致性问题
因为数据库与缓存是不同的组件,操作必须有先后顺序,无法像数据库的事务一样满足ACID的特性,所以就会出现数据在缓存中与在数据库中不一致的问题
解决方案(6种):
①先更新db,再删除缓存:
如果更新db成功,删缓存失败,将导致数据不一致②先更新db,再更新缓存:
并发更新场景下,更新缓存会导致数据不一致③先更新缓存,再更新db:
并发更新场景下,更新缓存会导致数据不一致④ 先删缓存,再更新db:
请求 A删除缓存,这时请求B来查,就会击穿到数据库,B读取到旧的值后写入缓存,A正常更新db,由于时间差导致数据不一致的情况⑤ 缓存延时双删:
(1)先淘汰缓存
(2)再写数据库
(3)休眠1秒,再次淘汰缓存⑥变种双删,前置缓存过期时间
6.redis持久化的两种方式
6.1 RDB
开启
save 900 1 #900 秒之内,对数据库进行了至少 1 次修改;
save 300 10 #300 秒之内,对数据库进行了至少 10 次修改
save 60 10000 #60 秒之内,对数据库进行了至少 10000 次修改
save :
同步保存操作,会阻塞 Redis 主线程;bgsave :
fork 出一个子进程,子进程执行,不会阻塞 Redis 主线程,默认选项
定义:
RDB是在指定时间间隔内将内存中的数据集快照写入磁盘,也就是snapshot快照,它恢复时是将快照文件直接读到内存里
备份执行:
Redis会单独创建(fork)一个子进程来进行持久化,会先将数据写入到一个临时文件中,待持久化过程结束,再用这个临时文件
替换上次持久化好的文件。整个过程中,主进程是不进行任何IO操作,这确保了极高的性能如果需要进行大规模数据恢复,且对于数据恢复的完整性不是特别敏感RDB比AO更加高效。
特点:
优点:节省磁盘空间、恢复速度快
缺点:
①虽然redis在fork时使用了写时拷贝技术,但是如果数据庞大,还是比较消耗性能
加粗样式②RDB的缺点是最后一次持久化的数据可能丢失
6.2 AOF
开启:
redis.conf 中将 appendonly no,修改为 appendonly yes 来开启AOF
定义:
以日志的形式来记录每个写操作,将redis执行过程的所有指令记录下来(读操作不记录),只需追加文件但不可以改写文件,redis启动之初会读取改文件重新构建数据。也就是redis重启的话就根据日志文件的内容将写指令从前到后执行一次以完成数据的恢复工作。
特点:
优点: 备份机制更稳健,丢失数据概率更低 可读的日志文本,通过操作AOF文件,可以处理误操作
缺点: 比起RDB占用更多的磁盘空间 恢复备份速度慢 每次读写都同步,有一定的性能压力 存在个别的bug,造成恢复不能
6.3 Redis 混合持久化
什么是混合持久化?
混合持久化是在 AOF 持久化的基础上,定期进行 RDB 持久化,以保证数据的快速恢复 混合持久化的实现方式是在 AOF 重写时,将 RDB
文件以二进制压缩格式写入到 AOF 文件的开头,之后的数据再以 AOF 格式追加到文件的末尾3
混合持久化的优点是: 可以减少 AOF文件的大小,节省磁盘空间 可以加快数据恢复的速度,避免执行大量的 AOF 命令 可以避免数据丢失,因为 RDB 文件和 AOF文件都有最新的数据快照
如何开启混合持久化?
要开启混合持久化,需要在 redis.conf 文件中设置以下参数:
appendonly yes 开启 AOF 持久化
aof-use-rdb-preamble yes 开启混合持久化 开启混合持久化后,可以通过以下命令触发 AOF
重写: bgrewriteaof 在后台执行 AOF 重写 config set auto-aof-rewrite-percentage<percentage>
设置 AOF 文件增长百分比阈值,当达到该阈值时自动执行 AOF
重写 config set auto-aof-rewrite-min-size<size>
设置 AOF 文件增长最小字节数阈值,当达到该阈值时自动执行 AOF 重写
如何恢复混合持久化的数据?
复混合持久化的数据和恢复 AOF 持久化的数据过程是一样的,只需要把 appendonly.aof 文件放到 redis 的根目录,在
redis 启动时,只要开启了 AOF 持久化,redis 就会自动加载并恢复数据 恢复混合持久化的数据的步骤是: 首先读取 AOF
文件中的 RDB 部分,将其中的键值对加载到内存中 然后读取 AOF 文件中的 AOF 部分,按顺序执行其中的命令,更新内存中的数据
7.redis数据淘汰策略
1.noeviction(默认策略):
不会删除任何数据,拒绝所有写入操作并返回客户端错误消息(error)OOM command not allowed when used memory,此时 Redis 只响应删和读操作;2.allkeys-lru:
从所有 key 中使用 LRU 算法进行淘汰(LRU 算法:最近最少使用算法);3.allkeys-lfu:
从所有 key 中使用 LFU 算法进行淘汰(LFU 算法:最不常用算法,根据使用频率计算,4.0 版本新增);4.volatile-lru:
从设置了过期时间的 key 中使用 LRU 算法进行淘汰;5.volatile-lfu:
从设置了过期时间的 key 中使用 LFU 算法进行淘汰;6.allkeys-random:
从所有 key 中随机淘汰数据;7.volatile-random:
从设置了过期时间的 key 中随机淘汰数据;8.volatile-ttl:
在设置了过期时间的key中,淘汰过期时间剩余最短的。
LRU: Least Recently Used 最近时间最少使用
LFU: Least Frequently Used 最少频率使用
实际工作中配置:
allkeys-lru
配置方法,在redis.conf中配置maxmemory-policy allkeys-lru
如果一个键是过期了的,那它到了过期时间是不是马上就从内存中被删除了呢?
不是。
1.定时删除
立即删除过期的数据优点:
能保证内存中的数据的最大新鲜度缺点:
对CPU不好,用处理器性能换区存储空间,redis不可能时时刻刻遍历所有的key,太耗cpu
2.惰性删除
数据到达刚过期时间不做处理,等下次访问数据的时候,如果未过期返回数据,已过期则删除优点:
对cpu友好缺点:
对内存不友好。如果一个过期数据永远没被访问那它就永远留在内存空间
3.定期删除
每隔一段时间执行一次删除过期键操作,并通过限制删除操作执行的时长和频率来减少删除操作对CPU时间的影响。优点:
CPU性能占用设置有峰值,监测频率自定义,内存压力也不会很大缺点:
定期随机抽取进行检查,可能导致很多key到了时间而没有被删除
在 redis.conf 配置hz 10
Redis 默认会每秒进行 10 次(redis.conf 中通过 hz 配置)过期扫描
还没有评论,来说两句吧...