缓存穿透
缓存穿透:对于一个缓存和数据库中都不存在的数据查询,因为每次查询都会穿过缓存直接打在数据库上,所以很容易增大数据库压力,如下图所示。
对于缓存穿透这个问题,解决方案包括但不限于如下:
缓存空对象
进行一次结果为空的数据库查询后,以该查询为key,null为value,并设置TTL,将其放进缓存,这样在下一次同样的查询时,就不会再进行数据库查询。缺点
- 缓存空对象会造成缓存的内存开销。
代码实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42/**
* 普通缓存查询,使用存储空对象的方法防止缓存穿透
*
* @param keyPrefix key的前缀
* @param id 要查询的id
* @param type 查询的实体类型
* @param dbFallBack 数据库查询方法
* @param time 新缓存的过期时间
* @param unit 时间单位
* @param <T> 实体类型
* @param <ID> id类型
* @return 查询到的结果
*/
public <T, ID> T queryById(String keyPrefix, ID id, Class<T> type, Function<ID, T> dbFallBack, Long time, TimeUnit unit) {
//查询redis中是否有需要的数据
String key = keyPrefix + id;
String resJson = redisTemplate.opsForValue().get(key);
//判断缓存是否为空
if (StrUtil.isNotBlank(resJson)) {
//存在直接返回
return JSONUtil.toBean(resJson, type, false);
}
//判断是否是空对象
if (resJson != null) {
log.warn("命中空对象");
return null;
}
//不存在查询数据库
log.warn("未命中缓存,访问数据库!");
T res = dbFallBack.apply(id);
//判断数据库是否有数据
if (res == null) {
//缓存空对象
redisTemplate.opsForValue().set(key, RedisConstants.NULL_VALUE, RedisConstants.CACHE_NULL_TTL, TimeUnit.MINUTES);
return null;
}
//存在则将数据写入redis
set(key, res, time, unit);
//返回
return res;
}布隆过滤器
在缓存前进行一次过滤,一般调用封装好的布隆过滤器实现。未进行实践缺点
- 不能完全保证不发生缓存穿透。
缓存雪崩
当某一个时刻出现大规模的缓存失效的情况,那么就会导致大量的请求直接打在数据库上面,导致数据库压力巨大,如果在高并发的情况下,可能瞬间就会导致数据库宕机。这时候如果运维马上又重启数据库,马上又会有新的流量把数据库打死。这就是缓存雪崩。
为了解决缓存雪崩,一般有这些方法:
设置N+n的缓存过期时间
因为缓存雪崩是大量缓存同时过期造成的,所以只要让缓存的时间是随机的,那么同时过期的概率就会降低。代码实现
1
2
3
4
5
6
7
8
9
10
11
12
13/**
* 普通写入缓存
*
* @param key 缓存的key值
* @param value 要缓存的对象
* @param time 缓存的持续时间
* @param unit 持续时间单位
*/
public void set(String key, Object value, Long time, TimeUnit unit) {
//设置N+n缓存,防止缓存雪崩
time += RandomUtil.randomLong(time);
redisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value), time, unit);
}使用多级缓存
Redis宕机后进行限流,减少数据库压力
缓存击穿
其实跟缓存雪崩有点类似,缓存雪崩是大规模的key失效,而缓存击穿是一个热点的Key,有大并发集中对其进行访问,突然间这个Key失效了,导致大并发全部打在数据库上,导致数据库压力剧增。这种现象就叫做缓存击穿。
一般有两种解决方案:
设置逻辑过期
这样的缓存是永久存在的,这样就避免了查询直接访问数据库的情况,具体流程如下图:
代码实现
先设置一个数据类用来存储包含数据和过期时间,且尽量不要修改原代码
1
2
3
4
5
public class RedisData {
private LocalDateTime expireTime;
private Object data;
}先写一个获取锁和释放锁的工具函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20/**
* 尝试获取互斥锁
*
* @param lockKey 互斥锁的键名
* @return 获取锁是否成功
*/
private boolean tryLock(String lockKey) {
Boolean isLock = redisTemplate.opsForValue().setIfAbsent(lockKey, RedisConstants.LOCK_VALUE);
redisTemplate.expire(lockKey, RedisConstants.LOCK_SHOP_TTL, TimeUnit.SECONDS);
return BooleanUtil.isTrue(isLock);
}
/**
* 释放互斥锁
*
* @param lockKey 互斥锁的键名
*/
private void unLock(String lockKey) {
redisTemplate.delete(lockKey);
}然后完成图中的流程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77//线程池
private static final ExecutorService CACHE_EXECUTOR = Executors.newSingleThreadExecutor();
private <T> boolean checkLogicalExpire (String key, Class<T> type, AtomicReference<T> dataHolder) {
String redisDataJson = redisTemplate.opsForValue().get(key);
//命中,判断缓存是否过期
RedisData redisData = JSONUtil.toBean(redisDataJson, RedisData.class, false);
LocalDateTime expireTime = redisData.getExpireTime();
T data = JSONUtil.toBean((JSONObject) redisData.getData(), type);
dataHolder.set(data);
if (expireTime.isAfter(LocalDateTime.now())) {
//未过期
return false;
}
return true;
}
/**
* 逻辑过期缓存查询,防止缓存击穿
*
* @param keyPrefix key的前缀
* @param id 要查询的id
* @param type 查询的实体类型
* @param dbFallBack 数据库查询方法
* @param time 新缓存的过期时间
* @param unit 时间单位
* @param <T> 实体类型
* @param <ID> id类型
* @return 查询到的结果
*/
public <T, ID> T queryWithLogicalExpire(String keyPrefix, ID id, Class<T> type, Function<ID, T> dbFallBack, Long time, TimeUnit unit) {
//从redis查询缓存数据
String key = keyPrefix + id;
//判断缓存是否存在
String redisDataJson = redisTemplate.opsForValue().get(key);
if (StrUtil.isBlank(redisDataJson)) {
//未命中
log.warn("逻辑过期缓存不存在!");
return null;
}
AtomicReference<T> dataHolder = new AtomicReference<>();
boolean isExpire = checkLogicalExpire(key, type, dataHolder);
T data = dataHolder.get();
if (!isExpire) {
//未过期
return data;
}
//过期,进行缓存重建
String lockKey = RedisConstants.LOCK_KEY + key;
//尝试获取互斥锁
if (tryLock(lockKey)) {
//获取锁成功,开启新线程来重建缓存
//double check
isExpire = checkLogicalExpire(key, type, dataHolder);
data = dataHolder.get();
if (!isExpire) {
//未过期
return data;
}
CACHE_EXECUTOR.submit(() -> {
try {
//重建缓存
log.warn("重建逻辑过期缓存!");
T value = dbFallBack.apply(id);
setWithLogicalExpire(key, value, time, unit);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
//释放锁
unLock(lockKey);
}
});
}
//返回当前缓存信息
return data;
}设置互斥锁
当访问不到缓存时,只允许一个线程去访问数据库,并重建缓存。流程如下:
代码实现如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55/**
* 使用互斥锁来防止缓存击穿
* @param keyPrefix key的前缀
* @param id 查询id
* @param type 实体类型
* @param dbFallBack 查询方法
* @param time 缓存时间
* @param unit 时间单位
* @return 查询数据
* @param <T> 实体类型
* @param <ID> id类型
*/
public <T, ID> T queryWithMutex(String keyPrefix, ID id, Class<T> type, Function<ID, T> dbFallBack, Long time, TimeUnit unit) {
// 获取缓存
String key = keyPrefix + id;
String resJson = redisTemplate.opsForValue().get(key);
// 判断受否命中
if (StrUtil.isNotBlank(resJson)) {
// 命中则直接返回
return JSONUtil.toBean(resJson, type, false);
}
if (Objects.equals(resJson, RedisConstants.NULL_VALUE)) {
// 命中空对象
return null;
}
String lockKey = null;
T res;
try {
lockKey = RedisConstants.LOCK_KEY + key;
while (!tryLock(lockKey)) {
Thread.sleep(10);
}
//获取到互斥锁
//重建缓存
//double check
resJson = redisTemplate.opsForValue().get(key);
if (StrUtil.isNotBlank(resJson)) {
// 命中则直接返回
return JSONUtil.toBean(resJson, type, false);
}
if (Objects.equals(resJson, RedisConstants.NULL_VALUE)) {
// 命中空对象
return null;
}
res = dbFallBack.apply(id);
set(key, res, time, unit);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}finally {
//释放锁
unLock(lockKey);
}
return res;
}二者优缺点