关于缓存会出现的一些问题及其Java实现

5.8k 词

缓存穿透

缓存穿透:对于一个缓存和数据库中都不存在的数据查询,因为每次查询都会穿过缓存直接打在数据库上,所以很容易增大数据库压力,如下图所示。

对于缓存穿透这个问题,解决方案包括但不限于如下:

  • 缓存空对象

    进行一次结果为空的数据库查询后,以该查询为key,null为value,并设置TTL,将其放进缓存,这样在下一次同样的查询时,就不会再进行数据库查询

    缺点

    • 缓存空对象会造成缓存的内存开销。

    代码实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    /**
    * 普通缓存查询,使用存储空对象的方法防止缓存穿透
    *
    * @param keyPrefix key的前缀
    * @param id 要查询的id
    * @param type 查询的实体类型
    * @param dbFallBack 数据库查询方法
    * @param time 新缓存的过期时间
    * @param unit 时间单位
    * @param <T> 实体类型
    * @param <ID> id类型
    * @return 查询到的结果
    */
    public <T, ID> T queryById(String keyPrefix, ID id, Class<T> type, Function<ID, T> dbFallBack, Long time, TimeUnit unit) {
    //查询redis中是否有需要的数据
    String key = keyPrefix + id;
    String resJson = redisTemplate.opsForValue().get(key);
    //判断缓存是否为空
    if (StrUtil.isNotBlank(resJson)) {
    //存在直接返回
    return JSONUtil.toBean(resJson, type, false);
    }
    //判断是否是空对象
    if (resJson != null) {
    log.warn("命中空对象");
    return null;
    }
    //不存在查询数据库
    log.warn("未命中缓存,访问数据库!");
    T res = dbFallBack.apply(id);
    //判断数据库是否有数据
    if (res == null) {
    //缓存空对象
    redisTemplate.opsForValue().set(key, RedisConstants.NULL_VALUE, RedisConstants.CACHE_NULL_TTL, TimeUnit.MINUTES);
    return null;
    }
    //存在则将数据写入redis
    set(key, res, time, unit);
    //返回
    return res;
    }

  • 布隆过滤器

    在缓存前进行一次过滤,一般调用封装好的布隆过滤器实现。未进行实践

    缺点

    • 不能完全保证不发生缓存穿透。

缓存雪崩

当某一个时刻出现大规模的缓存失效的情况,那么就会导致大量的请求直接打在数据库上面,导致数据库压力巨大,如果在高并发的情况下,可能瞬间就会导致数据库宕机。这时候如果运维马上又重启数据库,马上又会有新的流量把数据库打死。这就是缓存雪崩。

为了解决缓存雪崩,一般有这些方法:

  • 设置N+n的缓存过期时间

    因为缓存雪崩是大量缓存同时过期造成的,所以只要让缓存的时间是随机的,那么同时过期的概率就会降低。

    代码实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    /**
    * 普通写入缓存
    *
    * @param key 缓存的key值
    * @param value 要缓存的对象
    * @param time 缓存的持续时间
    * @param unit 持续时间单位
    */
    public void set(String key, Object value, Long time, TimeUnit unit) {
    //设置N+n缓存,防止缓存雪崩
    time += RandomUtil.randomLong(time);
    redisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value), time, unit);
    }
  • 使用多级缓存

  • Redis宕机后进行限流,减少数据库压力

缓存击穿

其实跟缓存雪崩有点类似,缓存雪崩是大规模的key失效,而缓存击穿是一个热点的Key,有大并发集中对其进行访问,突然间这个Key失效了,导致大并发全部打在数据库上,导致数据库压力剧增。这种现象就叫做缓存击穿。

一般有两种解决方案:

  • 设置逻辑过期

    这样的缓存是永久存在的,这样就避免了查询直接访问数据库的情况,具体流程如下图:

    代码实现

    先设置一个数据类用来存储包含数据和过期时间,且尽量不要修改原代码

    1
    2
    3
    4
    5
    @Data
    public class RedisData {
    private LocalDateTime expireTime;
    private Object data;
    }

    先写一个获取锁和释放锁的工具函数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    /**
    * 尝试获取互斥锁
    *
    * @param lockKey 互斥锁的键名
    * @return 获取锁是否成功
    */
    private boolean tryLock(String lockKey) {
    Boolean isLock = redisTemplate.opsForValue().setIfAbsent(lockKey, RedisConstants.LOCK_VALUE);
    redisTemplate.expire(lockKey, RedisConstants.LOCK_SHOP_TTL, TimeUnit.SECONDS);
    return BooleanUtil.isTrue(isLock);
    }

    /**
    * 释放互斥锁
    *
    * @param lockKey 互斥锁的键名
    */
    private void unLock(String lockKey) {
    redisTemplate.delete(lockKey);
    }

    然后完成图中的流程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    //线程池
    private static final ExecutorService CACHE_EXECUTOR = Executors.newSingleThreadExecutor();

    private <T> boolean checkLogicalExpire (String key, Class<T> type, AtomicReference<T> dataHolder) {
    String redisDataJson = redisTemplate.opsForValue().get(key);
    //命中,判断缓存是否过期
    RedisData redisData = JSONUtil.toBean(redisDataJson, RedisData.class, false);
    LocalDateTime expireTime = redisData.getExpireTime();
    T data = JSONUtil.toBean((JSONObject) redisData.getData(), type);
    dataHolder.set(data);
    if (expireTime.isAfter(LocalDateTime.now())) {
    //未过期
    return false;
    }
    return true;
    }

    /**
    * 逻辑过期缓存查询,防止缓存击穿
    *
    * @param keyPrefix key的前缀
    * @param id 要查询的id
    * @param type 查询的实体类型
    * @param dbFallBack 数据库查询方法
    * @param time 新缓存的过期时间
    * @param unit 时间单位
    * @param <T> 实体类型
    * @param <ID> id类型
    * @return 查询到的结果
    */
    public <T, ID> T queryWithLogicalExpire(String keyPrefix, ID id, Class<T> type, Function<ID, T> dbFallBack, Long time, TimeUnit unit) {
    //从redis查询缓存数据
    String key = keyPrefix + id;
    //判断缓存是否存在
    String redisDataJson = redisTemplate.opsForValue().get(key);
    if (StrUtil.isBlank(redisDataJson)) {
    //未命中
    log.warn("逻辑过期缓存不存在!");
    return null;
    }
    AtomicReference<T> dataHolder = new AtomicReference<>();
    boolean isExpire = checkLogicalExpire(key, type, dataHolder);
    T data = dataHolder.get();
    if (!isExpire) {
    //未过期
    return data;
    }
    //过期,进行缓存重建
    String lockKey = RedisConstants.LOCK_KEY + key;
    //尝试获取互斥锁
    if (tryLock(lockKey)) {
    //获取锁成功,开启新线程来重建缓存
    //double check
    isExpire = checkLogicalExpire(key, type, dataHolder);
    data = dataHolder.get();
    if (!isExpire) {
    //未过期
    return data;
    }
    CACHE_EXECUTOR.submit(() -> {
    try {
    //重建缓存
    log.warn("重建逻辑过期缓存!");
    T value = dbFallBack.apply(id);
    setWithLogicalExpire(key, value, time, unit);
    } catch (Exception e) {
    throw new RuntimeException(e);
    } finally {
    //释放锁
    unLock(lockKey);
    }
    });
    }
    //返回当前缓存信息
    return data;
    }

  • 设置互斥锁

    当访问不到缓存时,只允许一个线程去访问数据库,并重建缓存。流程如下:

    代码实现如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    /**
    * 使用互斥锁来防止缓存击穿
    * @param keyPrefix key的前缀
    * @param id 查询id
    * @param type 实体类型
    * @param dbFallBack 查询方法
    * @param time 缓存时间
    * @param unit 时间单位
    * @return 查询数据
    * @param <T> 实体类型
    * @param <ID> id类型
    */
    public <T, ID> T queryWithMutex(String keyPrefix, ID id, Class<T> type, Function<ID, T> dbFallBack, Long time, TimeUnit unit) {
    // 获取缓存
    String key = keyPrefix + id;
    String resJson = redisTemplate.opsForValue().get(key);
    // 判断受否命中
    if (StrUtil.isNotBlank(resJson)) {
    // 命中则直接返回
    return JSONUtil.toBean(resJson, type, false);
    }
    if (Objects.equals(resJson, RedisConstants.NULL_VALUE)) {
    // 命中空对象
    return null;
    }
    String lockKey = null;
    T res;
    try {
    lockKey = RedisConstants.LOCK_KEY + key;
    while (!tryLock(lockKey)) {
    Thread.sleep(10);
    }
    //获取到互斥锁
    //重建缓存
    //double check
    resJson = redisTemplate.opsForValue().get(key);
    if (StrUtil.isNotBlank(resJson)) {
    // 命中则直接返回
    return JSONUtil.toBean(resJson, type, false);
    }
    if (Objects.equals(resJson, RedisConstants.NULL_VALUE)) {
    // 命中空对象
    return null;
    }
    res = dbFallBack.apply(id);
    set(key, res, time, unit);
    } catch (InterruptedException e) {
    throw new RuntimeException(e);
    }finally {
    //释放锁
    unLock(lockKey);
    }
    return res;
    }

  • 二者优缺点

    • 设置逻辑过期

      优点:
      • 线程无需等待,性能好
        缺点:
      • 不保证数据一致性
      • 有额外内存消耗
      • 实现复杂
    • 设置互斥锁

      优点:
      • 没有额外内存消耗
      • 保证一致性
      • 实现简单
        缺点:
      • 线程需要等待,性能受影响
      • 可能有死锁风险