1. 缓存穿透 — 布隆过滤器
1.1 概念
缓存穿透是指查询一个数据库中也不存在的数据。由于缓存未命中,请求会直接穿过缓存层打到数据库。若攻击者构造大量不存在的 id(如负数、随机 UUID)发起请求,DB 压力会急剧上升,严重时导致宕机。
请求 → 缓存(miss) → DB(miss) → 返回 null
请求 → 缓存(miss) → DB(miss) → 返回 null
请求 → 缓存(miss) → DB(miss) → 返回 null
... 大量无效查询打穿 DB1.2 布隆过滤器原理
布隆过滤器是一个概率型数据结构,由一个 bit 数组 + 多个哈希函数组成:
- 判断存在:可能误判(说"有"但实际可能没有)
- 判断不存在:100% 准确(说"没有"那绝对没有)
利用这个特性,把数据库中所有数据 id 提前加入布隆过滤器。查询时先过布隆过滤器,若返回"不存在"则直接拦截,不再查询 DB。
误判率与位数组大小、哈希函数个数、预期插入量相关。一般设置 0.01~0.03 即可接受,配合缓存空值兜底。
1.3 Redisson 初始化布隆过滤器
依赖 (pom.xml)
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.22.1</version>
</dependency>配置类 BloomFilterConfig.java
package com.example.shop.config;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import org.redisson.api.RBloomFilter;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.StringCodec;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public class BloomFilterConfig {
public static final String SHOP_BLOOM_KEY = "bloom:shop:id";
@Resource
private RedissonClient redissonClient;
@Resource
private JdbcTemplate jdbcTemplate;
/**
* 应用启动时初始化布隆过滤器,并预加载已有数据 ID
*/
@PostConstruct
public void initBloomFilter() {
RBloomFilter<String> bloomFilter = redissonClient.getBloomFilter(SHOP_BLOOM_KEY);
// 初始化:预期插入 100000 条,误判率 0.01
bloomFilter.tryInit(100_000L, 0.01);
// 从 DB 批量加载已有 ID
List<Long> ids = jdbcTemplate.query("SELECT id FROM tb_shop WHERE deleted = 0",
(rs, rowNum) -> rs.getLong("id"));
for (Long id : ids) {
bloomFilter.add(id.toString());
}
System.out.println("Bloom filter initialized with " + ids.size() + " keys");
}
}1.4 穿透拦截器
package com.example.shop.interceptor;
import com.example.shop.config.BloomFilterConfig;
import jakarta.annotation.Resource;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.redisson.api.RBloomFilter;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.HandlerInterceptor;
import org.springframework.web.servlet.HandlerMapping;
@Component
public class BloomFilterInterceptor implements HandlerInterceptor {
@Resource
private RedissonClient redissonClient;
@Override
public boolean preHandle(HttpServletRequest request,
HttpServletResponse response,
Object handler) throws Exception {
// 从路径中提取 id(如 /api/shop/{id})
String pathId = extractId(request);
if (pathId == null) {
return true;
}
RBloomFilter<String> bloomFilter = redissonClient
.getBloomFilter(BloomFilterConfig.SHOP_BLOOM_KEY);
if (!bloomFilter.contains(pathId)) {
response.setContentType("application/json;charset=UTF-8");
response.setStatus(404);
response.getWriter().write("{\"code\":404,\"msg\":\"数据不存在\"}");
return false;
}
return true;
}
private String extractId(HttpServletRequest request) {
// 尝试从 @PathVariable 中获取 id
Object value = request.getAttribute(HandlerMapping.URI_TEMPLATE_VARIABLES_ATTRIBUTE);
if (value instanceof java.util.Map<?, ?> map) {
Object id = map.get("id");
return id != null ? id.toString() : null;
}
return null;
}
}注册拦截器 WebMvcConfig.java
package com.example.shop.config;
import com.example.shop.interceptor.BloomFilterInterceptor;
import jakarta.annotation.Resource;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
@Configuration
public class WebMvcConfig implements WebMvcConfigurer {
@Resource
private BloomFilterInterceptor bloomFilterInterceptor;
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(bloomFilterInterceptor)
.addPathPatterns("/api/shop/**");
}
}1.5 兜底方案:缓存空值
即使布隆过滤器放过了一个不存在的 id(误判),也可以在缓存中存一个空值标记,避免重复打到 DB:
package com.example.shop.service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.annotation.Resource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import java.time.Duration;
@Service
public class ShopCacheService {
private static final String CACHE_PREFIX = "cache:shop:";
private static final String NULL_VALUE = "{}"; // 空对象占位符
private static final Duration NULL_TTL = Duration.ofMinutes(2);
private static final Duration NORMAL_TTL = Duration.ofMinutes(30);
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private ObjectMapper objectMapper;
public <T> T getOrLoad(String key, Class<T> clazz, java.util.function.Supplier<T> dbLoader) {
String cacheKey = CACHE_PREFIX + key;
String cacheValue = stringRedisTemplate.opsForValue().get(cacheKey);
if (cacheValue != null) {
if (NULL_VALUE.equals(cacheValue)) {
return null;
}
try {
return objectMapper.readValue(cacheValue, clazz);
} catch (JsonProcessingException e) {
throw new RuntimeException("缓存反序列化失败", e);
}
}
// 查 DB
T data = dbLoader.get();
if (data == null) {
// 缓存空值,防止穿透
stringRedisTemplate.opsForValue().set(cacheKey, NULL_VALUE, NULL_TTL);
return null;
}
try {
stringRedisTemplate.opsForValue()
.set(cacheKey, objectMapper.writeValueAsString(data), NORMAL_TTL);
} catch (JsonProcessingException e) {
throw new RuntimeException("缓存序列化失败", e);
}
return data;
}
}2. 缓存击穿 — 互斥锁 / 逻辑过期
2.1 概念
缓存击穿是指热点 key 在过期瞬间,大量并发请求同时打到数据库。
时间轴:
T=0: 缓存 key 过期
T=1: 1000 个并发请求同时 miss 缓存
T=2: 1000 个请求全部打到 DB,执行 1000 次相同查询
→ DB 瞬间压力巨大与穿透的区别:击穿查询的是真实存在的热点数据,只是 key 碰巧在同一时刻过期。
2.2 互斥锁方案
核心思路:第一个拿到锁的线程查 DB 回写缓存,其他线程等待后读缓存(双重检查)。
封装 Redis 分布式锁工具类 RedisLock.java
package com.example.shop.utils;
import jakarta.annotation.Resource;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
@Component
public class RedisLock {
private static final long WAIT_SECONDS = 3;
private static final long LEASE_SECONDS = 30;
@Resource
private RedissonClient redissonClient;
/**
* 加锁执行,获取锁失败时抛出异常
*/
public <T> T lockAndExecute(String lockKey, Supplier<T> supplier) {
RLock lock = redissonClient.getLock(lockKey);
try {
if (!lock.tryLock(WAIT_SECONDS, LEASE_SECONDS, TimeUnit.SECONDS)) {
throw new RuntimeException("获取锁超时: " + lockKey);
}
return supplier.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("获取锁被中断: " + lockKey, e);
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
}Service 层实现 ShopService.java
package com.example.shop.service;
import com.example.shop.pojo.Shop;
import com.example.shop.utils.RedisLock;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.annotation.Resource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import java.time.Duration;
@Service
public class ShopService {
private static final String CACHE_KEY = "cache:shop:";
private static final String LOCK_KEY = "lock:shop:";
private static final Duration TTL = Duration.ofMinutes(30);
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private ShopDao shopDao;
@Resource
private RedisLock redisLock;
@Resource
private ObjectMapper objectMapper;
/**
* 互斥锁方案:查询店铺,防止缓存击穿
*/
public Shop getShopMutex(Long id) throws Exception {
String key = CACHE_KEY + id;
// 1. 查缓存
String cacheValue = stringRedisTemplate.opsForValue().get(key);
if (cacheValue != null) {
return objectMapper.readValue(cacheValue, Shop.class);
}
// 2. 缓存未命中 → 加互斥锁
String lockKey = LOCK_KEY + id;
return redisLock.lockAndExecute(lockKey, () -> {
// 3. 双重检查:获锁后再次查缓存
String cacheAfterLock = stringRedisTemplate.opsForValue().get(key);
if (cacheAfterLock != null) {
try {
return objectMapper.readValue(cacheAfterLock, Shop.class);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
// 4. 查 DB
Shop shop = shopDao.selectById(id);
if (shop == null) {
// 缓存空值防止穿透(TTL 短一些)
stringRedisTemplate.opsForValue().set(key, "{}", Duration.ofMinutes(2));
return null;
}
// 5. 回写缓存
try {
stringRedisTemplate.opsForValue()
.set(key, objectMapper.writeValueAsString(shop), TTL);
} catch (Exception e) {
throw new RuntimeException(e);
}
return shop;
});
}
}2.3 逻辑过期方案
核心思路:缓存永不过期(物理 TTL),在 value 中嵌入一个 expireTime 字段。读取时发现逻辑过期,立即返回旧值,并异步开一个线程去查 DB 重建缓存。
包装类 RedisData.java
package com.example.shop.pojo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class RedisData<T> {
/** 逻辑过期时间 */
private LocalDateTime expireTime;
/** 实际数据 */
private T data;
}逻辑过期方案 Service
package com.example.shop.service;
import com.example.shop.pojo.RedisData;
import com.example.shop.pojo.Shop;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import jakarta.annotation.Resource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@Service
public class ShopLogicExpireService {
private static final String CACHE_KEY = "cache:shop:";
private static final String LOCK_KEY = "lock:shop:";
// 重建缓存的线程池(生产环境使用 ThreadPoolTaskExecutor)
private final ExecutorService rebuildPool = Executors.newFixedThreadPool(5);
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private ShopDao shopDao;
private final ObjectMapper objectMapper = new ObjectMapper()
.registerModule(new JavaTimeModule());
/**
* 逻辑过期方案查询
*/
public Shop getShopLogic(Long id) throws Exception {
String key = CACHE_KEY + id;
String cacheValue = stringRedisTemplate.opsForValue().get(key);
// 1. 缓存完全未命中
if (cacheValue == null) {
return loadAndSetCache(id, key);
}
// 2. 解析缓存
RedisData<Shop> redisData = objectMapper.readValue(cacheValue,
objectMapper.getTypeFactory().constructParametricType(RedisData.class, Shop.class));
Shop shop = redisData.getData();
LocalDateTime expireTime = redisData.getExpireTime();
// 3. 未过期 → 直接返回
if (expireTime.isAfter(LocalDateTime.now())) {
return shop;
}
// 4. 已过期 → 尝试获取锁,异步重建
String lockKey = LOCK_KEY + id;
Boolean lock = stringRedisTemplate.opsForValue()
.setIfAbsent(lockKey, "1", Duration.ofSeconds(10));
if (Boolean.TRUE.equals(lock)) {
// 拿到锁 → 开启新线程重建缓存
CompletableFuture.runAsync(() -> {
try {
loadAndSetCache(id, key);
} catch (Exception e) {
System.err.println("缓存重建失败: " + e.getMessage());
} finally {
stringRedisTemplate.delete(lockKey);
}
}, rebuildPool);
}
// 5. 无论是否拿到锁,都返回旧数据(保证可用性)
return shop;
}
/**
* 查 DB 并写入缓存,生产环境中 SETEX 使用 PX 原子设置过期时间
* 此处演示逻辑过期概念,实际 RedisData 的 expireTime 在代码中控制
*/
public Shop loadAndSetCache(Long id, String key) throws Exception {
Shop shop = shopDao.selectById(id);
if (shop == null) {
return null;
}
// 设置逻辑过期时间为 30 分钟后
RedisData<Shop> redisData = new RedisData<>(
LocalDateTime.now().plusMinutes(30), shop);
// 物理 TTL 设为 1 小时(比逻辑过期长),防止 key 被删除
String value = objectMapper.writeValueAsString(redisData);
stringRedisTemplate.opsForValue()
.set(key, value, Duration.ofHours(1));
return shop;
}
}2.4 方案对比
| 维度 | 互斥锁 | 逻辑过期 |
|---|---|---|
| 一致性 | 强一致,保证同一时刻只有一个线程重建 | 最终一致,短期可能返回旧数据 |
| 可用性 | 获取锁失败的请求需要等待(或降级) | 高,永远返回数据,重建异步 |
| 性能 | 锁竞争有开销 | 低延迟,异步重建不影响主流程 |
| 实现复杂度 | 简单,Redisson 一行代码 | 需维护逻辑过期时间 + 线程池 |
| 适用场景 | 对一致性要求高的场景(如金融、库存) | 对可用性要求高的场景(首页、商品详情) |
3. 缓存雪崩
3.1 概念
缓存雪崩是指大量 key 同时过期或Redis 服务宕机,导致所有请求直接打到 DB,造成 DB 瞬间崩溃。
场景一:同一时刻大量 key 过期
```text
### 3.2 TTL 随机化
给每个 key 的过期时间加上一个**随机值**,让过期时间分散在不同时刻:
```java
package com.example.shop.config;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.cache.RedisCacheConfiguration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import java.security.SecureRandom;
import java.time.Duration;
@Configuration
@EnableCaching
public class CacheConfig {
private static final SecureRandom RANDOM = new SecureRandom();
@Bean
public CacheManager cacheManager(RedisConnectionFactory factory) {
// 基础 TTL 30 分钟
Duration baseTtl = Duration.ofMinutes(30);
// 随机偏移:0 ~ 10 分钟
long randomSeconds = RANDOM.nextLong(10 * 60);
Duration actualTtl = baseTtl.plusSeconds(randomSeconds);
org.springframework.data.redis.cache.RedisCacheConfiguration config =
RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(actualTtl)
.serializeValuesWith(RedisSerializationContext.SerializationPair
.fromSerializer(new GenericJackson2JsonRedisSerializer()))
.disableCachingNullValues(); // 不缓存 null,防止内存浪费
return RedisCacheManager.builder(factory)
.cacheDefaults(config)
.build();
}
}手动设置 TTL 时加随机值
package com.example.shop.utils;
import java.security.SecureRandom;
import java.util.concurrent.TimeUnit;
public class CacheTTL {
private static final SecureRandom RANDOM = new SecureRandom();
/**
* 返回基础 TTL + 随机偏移,避免雪崩
* @param baseMinutes 基础 TTL(分钟)
* @param maxOffsetMinutes 最大随机偏移(分钟)
*/
public static long randomTtlMinutes(int baseMinutes, int maxOffsetMinutes) {
int offset = RANDOM.nextInt(maxOffsetMinutes + 1);
return (long) baseMinutes + offset;
}
// 示例:设置时使用
// stringRedisTemplate.opsForValue().set(key, value,
// Duration.ofMinutes(CacheTTL.randomTtlMinutes(30, 10)));
}3.3 熔断降级 (Sentinel)
当 Redis 不可用或 DB 压力过大时,进行熔断降级,返回兜底数据:
依赖
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
<version>2021.0.6.0</version>
</dependency>Controller 熔断示例
package com.example.shop.controller;
import com.alibaba.csp.sentinel.annotation.SentinelResource;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.example.shop.pojo.Shop;
import com.example.shop.service.ShopService;
import jakarta.annotation.Resource;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/api/shop")
public class ShopController {
@Resource
private ShopService shopService;
/**
* value 对应 Sentinel 资源名
* blockHandler:被限流/降级时调用的方法
* fallback:业务异常时调用的方法
*/
@GetMapping("/{id}")
@SentinelResource(value = "getShop",
blockHandler = "getShopBlockHandler",
fallback = "getShopFallback")
public Shop getShop(@PathVariable Long id) throws Exception {
return shopService.getShopMutex(id);
}
/** 被 Sentinel 限流/降级时返回 */
public Shop getShopBlockHandler(Long id, BlockException ex) {
System.out.println("Sentinel block: " + ex.getMessage());
// 返回兜底数据
Shop fallback = new Shop();
fallback.setId(id);
fallback.setName("系统繁忙,请稍后重试");
return fallback;
}
/** 业务异常时返回 */
public Shop getShopFallback(Long id, Throwable ex) {
System.out.println("Business exception: " + ex.getMessage());
Shop fallback = new Shop();
fallback.setId(id);
fallback.setName("服务暂时不可用");
return fallback;
}
}application.yml Sentinel 配置
spring:
cloud:
sentinel:
transport:
dashboard: localhost:8080 # Sentinel 控制台地址
port: 8719
datasource:
ds1:
nacos:
server-addr: localhost:8848
data-id: ${spring.application.name}-sentinel
group-id: DEFAULT_GROUP
data-type: json
rule-type: flow3.4 Redisson 高可用配置
哨兵模式
spring:
redis:
redisson:
config: |
sentinelServersConfig:
idleConnectionTimeout: 10000
connectTimeout: 10000
timeout: 3000
retryAttempts: 3
retryInterval: 1500
password: ${REDIS_PASSWORD:}
sentinelAddresses:
- "redis://sentinel1:26379"
- "redis://sentinel2:26379"
- "redis://sentinel3:26379"
masterName: mymaster
database: 0集群模式
spring:
redis:
redisson:
config: |
clusterServersConfig:
idleConnectionTimeout: 10000
connectTimeout: 10000
timeout: 3000
retryAttempts: 3
retryInterval: 1500
password: ${REDIS_PASSWORD:}
nodeAddresses:
- "redis://node1:6379"
- "redis://node2:6379"
- "redis://node3:6379"
- "redis://node4:6379"
- "redis://node5:6379"
- "redis://node6:6379"4. 双写一致
4.1 概念与 Cache-Aside 模式
双写一致指数据库和缓存之间的数据一致性。业界通用的 Cache-Aside 模式:
- 读:先读缓存,命中直接返回;未命中读 DB,回写缓存
- 写:先更新 DB,再删除缓存
为什么是先更新 DB 再删除缓存?
```text
### 4.2 Spring Cache 注解实现
**`ShopService.java`(Spring Cache 版本)**
```java
package com.example.shop.service;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.CachePut;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class ShopCacheAnnotationService {
/**
* 缓存读:自动拼接 key=cache:shop:#id
* condition:仅 id > 0 才走缓存(排除 -1 等无效参数)
*/
@Cacheable(value = "shop", key = "#id", unless = "#result == null")
public Shop getById(Long id) {
return shopDao.selectById(id);
}
/**
* CachePut:更新后也更新缓存,保证缓存与 DB 一致
* 但更推荐用 CacheEvict 删除缓存(下一次读时重建)
*/
@Transactional
@CachePut(value = "shop", key = "#shop.id")
public Shop update(Shop shop) {
shopDao.updateById(shop);
return shop;
}
/**
* CacheEvict:删除缓存
*/
@Transactional
@CacheEvict(value = "shop", key = "#id")
public void delete(Long id) {
shopDao.deleteById(id);
}
/**
* 批量删除 + 删除关联缓存
* beforeInvocation = true:方法执行前删缓存
*/
@Transactional
@CacheEvict(value = "shop", key = "#id")
public void updateAndEvict(Long id, ShopUpdateDTO dto) {
Shop shop = shopDao.selectById(id);
shop.setName(dto.getName());
shop.setPrice(dto.getPrice());
shopDao.updateById(shop);
}
}4.3 延时双删
Cache-Aside 方案在高并发下仍有极小概率出现不一致。延时双删增强:
- 先删除缓存
- 更新 DB
- 休眠 N 毫秒(等待其他线程正在进行的读操作完成)
- 再次删除缓存
package com.example.shop.service;
import jakarta.annotation.Resource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.concurrent.CompletableFuture;
@Service
public class ShopDoubleDeleteService {
private static final String CACHE_PREFIX = "cache:shop:";
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private ShopDao shopDao;
@Transactional
public void updateWithDoubleDelete(Long id, String name) {
String cacheKey = CACHE_PREFIX + id;
// 1. 删除缓存
stringRedisTemplate.delete(cacheKey);
// 2. 更新数据库
Shop shop = shopDao.selectById(id);
shop.setName(name);
shopDao.updateById(shop);
// 3. 延时(例如 500ms,根据业务读取耗时调整)
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 4. 二次删除
stringRedisTemplate.delete(cacheKey);
System.out.println("Double delete cache key: " + cacheKey);
});
}
}生产注意事项:
- 延时时间需根据实际读库 + 回写缓存的平均耗时设定,通常 200~2000ms
- 使用消息队列(RabbitMQ / Kafka)替代
Thread.sleep+CompletableFuture更可靠 - 可结合重试机制,删除失败时写入重试队列
4.4 Canal 监听 Binlog 实现最终一致
对于更严格的场景,通过监听 MySQL binlog 实现异步最终一致:
MySQL 写入 → binlog → Canal Server → MQ (Kafka/RabbitMQ) → 消费者 → 删缓存Canal 客户端示例 BinlogCacheSyncListener.java
package com.example.shop.canal;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.annotation.Resource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Component
public class BinlogCacheSyncListener {
private static final String CACHE_PREFIX = "cache:shop:";
private final ExecutorService executor = Executors.newSingleThreadExecutor();
private volatile boolean running = true;
@Resource
private StringRedisTemplate stringRedisTemplate;
@PostConstruct
public void init() {
executor.submit(this::listenBinlog);
}
private void listenBinlog() {
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1", 11111),
"example", "canal", "canal");
connector.connect();
connector.subscribe("CRUDClass_db\\.tb_shop"); // 订阅表
connector.rollback();
while (running) {
Message message = connector.getWithoutAck(1000);
long batchId = message.getId();
if (batchId == -1) {
continue;
}
List<Entry> entries = message.getEntries();
if (entries.isEmpty()) {
continue;
}
for (Entry entry : entries) {
if (entry.getEntryType() != EntryType.ROWDATA) {
continue;
}
RowChange rowChange;
try {
rowChange = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("解析 binlog 失败", e);
}
EventType eventType = rowChange.getEventType();
for (RowData rowData : rowChange.getRowDatasList()) {
// 删除缓存:INSERT / UPDATE / DELETE 都触发
if (eventType == EventType.INSERT ||
eventType == EventType.UPDATE ||
eventType == EventType.DELETE) {
// 取变更后的 id 列
List<Column> columns = rowData.getAfterColumnsList();
for (Column column : columns) {
if ("id".equals(column.getName())) {
String cacheKey = CACHE_PREFIX + column.getValue();
stringRedisTemplate.delete(cacheKey);
System.out.println("Canal sync deleted: " + cacheKey);
break;
}
}
}
}
}
connector.ack(batchId);
}
connector.disconnect();
}
@PreDestroy
public void destroy() {
running = false;
executor.shutdown();
}
}架构总结
| 方案 | 一致性 | 复杂度 | 适用场景 |
|---|---|---|---|
| Cache-Aside | 最终一致 | 低 | 大多数业务场景 |
| 延时双删 | 最终一致 | 中 | 并发较高,小概率不一致不可接受 |
| Canal + MQ | 最终一致 | 高 | 强极致一致性要求,多业务消费 |
最终建议:99% 的业务场景使用 Cache-Aside(先更新 DB 再删缓存) 即可。配合合理的 TTL,即使出现极端不一致,也会在 TTL 到期后自动修复。