【Java项目技术亮点】多级缓存一致性方案:Canal+MQ实现数据库与缓存的最终一致
摘要:Canal监听MySQL Binlog实现缓存一致性 本文介绍了一种零侵入的缓存一致性解决方案。通过真实电商案例,揭示了传统缓存更新方式存在的代码侵入、事务不一致、更新失败等问题。提出基于Canal+MQ的架构:Canal服务器伪装MySQL从库监听Binlog日志,将数据变更事件发送到消息队列,最后由消费者统一更新缓存。这种方案实现了数据库与缓存的最终一致性,避免了业务代码直接操作缓存带来
·
用户修改了订单地址,但缓存还是旧数据,导致物流发错地方——这就是缓存不一致。本文将揭秘大厂如何用Canal监听MySQL Binlog,配合消息队列实现数据库与缓存的最终一致,让业务代码零侵入。

文章目录
一、场景引入:一次因缓存不一致导致的客诉
1.1 真实案例
某电商平台,用户修改收货地址后:
时间线:
T+0分:用户在APP修改收货地址(北京→上海)
T+1分:数据库更新成功,返回修改成功
T+2分:用户查看订单详情,显示的还是北京(缓存未更新)
T+5分:用户再次修改,系统提示"地址已更新"
T+30分:订单发货,包裹发往北京
T+3天:用户投诉,包裹发错地址
问题根源:业务代码更新了数据库,但忘记更新缓存(或更新缓存失败),导致数据不一致。
1.2 传统缓存更新方式的问题
// 传统方式:业务代码中手动更新缓存
@Transactional
public void updateOrderAddress(Long orderId, String newAddress) {
// 1. 更新数据库
orderMapper.updateAddress(orderId, newAddress);
// 2. 更新缓存(问题在这里!)
// 问题1:如果更新缓存失败,数据不一致
// 问题2:如果事务回滚,缓存已更新
// 问题3:分布式环境下,多个服务都要写更新缓存的代码
redisTemplate.delete("order:" + orderId);
}
传统方式的痛点:
| 痛点 | 说明 | 后果 |
|---|---|---|
| 代码侵入 | 每个更新操作都要写缓存更新代码 | 代码冗余,易遗漏 |
| 事务不一致 | 缓存更新在事务外 | 事务回滚但缓存已更新 |
| 更新失败 | 缓存更新可能失败 | 数据永久不一致 |
| 并发问题 | 多个服务同时更新 | 竞态条件 |
二、解决方案:Canal+MQ实现最终一致
2.1 整体架构设计
┌─────────────────────────────────────────────────────────────────────┐
│ Canal+MQ缓存同步架构 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 业务服务 │ │ 业务服务 │ │ 业务服务 │ │
│ │ ServiceA │ │ ServiceB │ │ ServiceC │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ └───────────────────┼───────────────────┘ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ MySQL数据库 │ │
│ │ ┌─────────────────────┐ │ │
│ │ │ Binlog日志 │ │ │
│ │ │ 记录所有数据变更 │ │ │
│ │ └─────────────────────┘ │ │
│ └──────────────────────────┬──────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Canal Server │ │
│ │ ┌─────────────────────┐ │ │
│ │ │ 伪装成MySQL从库 │ │ │
│ │ │ 实时拉取Binlog │ │ │
│ │ │ 解析数据变更事件 │ │ │
│ │ └─────────────────────┘ │ │
│ └──────────────────────────┬──────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 消息队列 (MQ) │ │
│ │ ┌─────────────────────┐ │ │
│ │ │ Kafka/RocketMQ │ │ │
│ │ │ 缓存变更事件队列 │ │ │
│ │ └─────────────────────┘ │ │
│ └──────────────────────────┬──────────────────────────────────┘ │
│ │ │
│ ┌───────────────┼───────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 缓存消费者 │ │ 缓存消费者 │ │ 缓存消费者 │ │
│ │ Consumer1 │ │ Consumer2 │ │ Consumer3 │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ └───────────────────┼───────────────────┘ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Redis缓存层 │ │
│ │ ┌─────────────────────┐ │ │
│ │ │ 删除/更新缓存 │ │ │
│ │ │ 保证最终一致性 │ │ │
│ │ └─────────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
2.2 核心思路
- Canal监听Binlog:伪装成MySQL从库,实时获取数据变更
- MQ异步解耦:将变更事件发送到消息队列,削峰填谷
- 消费者更新缓存:消费消息,删除/更新Redis缓存
- 业务零侵入:业务代码只操作数据库,不感知缓存
三、实战代码:从零实现Canal+MQ缓存同步
3.1 Canal配置与部署
Canal Server配置
# canal.properties
# Canal Server基础配置
canal.ip = 127.0.0.1
canal.port = 11111
# Kafka配置(将Binlog事件发送到Kafka)
canal.serverMode = kafka
canal.mq.servers = kafka:9092
canal.mq.retries = 3
canal.mq.batchSize = 16384
canal.mq.lingerMs = 100
canal.mq.bufferMemory = 33554432
# instance.properties
# 数据库连接配置
canal.instance.master.address = mysql:3306
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.connectionCharset = UTF-8
# 订阅的表(支持正则)
canal.instance.filter.regex = db_order\\.t_order,db_user\\.t_user,db_product\\.t_product
# Kafka Topic配置
canal.mq.topic = canal_cache_sync
canal.mq.partition = 3
canal.mq.partitionHash = db_order\\.t_order:id,db_user\\.t_user:id
3.2 Canal消息消费者
/**
* Canal消息消费者
* 监听Kafka中的Binlog变更事件,同步更新缓存
*/
@Component
@Slf4j
public class CanalCacheSyncConsumer {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private CacheKeyBuilder cacheKeyBuilder;
/**
* 消费Canal消息
*/
@KafkaListener(topics = "canal_cache_sync", groupId = "cache-sync-group")
public void onCanalMessage(ConsumerRecord<String, String> record) {
String message = record.value();
try {
CanalMessage canalMessage = JSON.parseObject(message, CanalMessage.class);
String database = canalMessage.getDatabase();
String table = canalMessage.getTable();
String type = canalMessage.getType(); // INSERT/UPDATE/DELETE
List<Map<String, Object>> data = canalMessage.getData();
List<Map<String, Object>> old = canalMessage.getOld();
log.info("🔄 收到Canal消息: {}.{} {} 数据条数: {}",
database, table, type, data.size());
// 根据表名路由到对应的处理器
CacheSyncHandler handler = getHandler(table);
if (handler != null) {
handler.handle(database, table, type, data, old);
} else {
log.warn("⚠️ 未找到表 {} 的缓存同步处理器", table);
}
} catch (Exception e) {
log.error("❌ Canal消息处理失败: {}", message, e);
// 可以选择发送到死信队列
}
}
/**
* 获取缓存同步处理器
*/
private CacheSyncHandler getHandler(String table) {
switch (table) {
case "t_order":
return new OrderCacheSyncHandler();
case "t_user":
return new UserCacheSyncHandler();
case "t_product":
return new ProductCacheSyncHandler();
default:
return null;
}
}
}
3.3 缓存同步处理器
/**
* 缓存同步处理器接口
*/
public interface CacheSyncHandler {
void handle(String database, String table, String type,
List<Map<String, Object>> data, List<Map<String, Object>> old);
}
/**
* 订单缓存同步处理器
*/
@Component
@Slf4j
public class OrderCacheSyncHandler implements CacheSyncHandler {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private RedissonClient redissonClient;
@Override
public void handle(String database, String table, String type,
List<Map<String, Object>> data, List<Map<String, Object>> old) {
for (Map<String, Object> row : data) {
Long orderId = Long.valueOf(row.get("id").toString());
Long userId = Long.valueOf(row.get("user_id").toString());
// 构建需要删除的缓存Key列表
Set<String> keysToDelete = new HashSet<>();
// 1. 订单详情缓存
keysToDelete.add("order:detail:" + orderId);
// 2. 用户订单列表缓存
keysToDelete.add("order:user:list:" + userId);
// 3. 订单统计缓存
keysToDelete.add("order:user:count:" + userId);
// 4. 如果是状态变更,删除状态相关缓存
if ("UPDATE".equals(type) && old != null) {
for (Map<String, Object> oldRow : old) {
if (oldRow.containsKey("status")) {
String oldStatus = oldRow.get("status").toString();
String newStatus = row.get("status").toString();
if (!oldStatus.equals(newStatus)) {
keysToDelete.add("order:status:" + oldStatus + ":" + userId);
keysToDelete.add("order:status:" + newStatus + ":" + userId);
}
}
}
}
// 执行缓存删除
deleteCache(keysToDelete);
log.info("✅ 订单缓存同步完成: orderId={}, type={}, 删除缓存 {} 个",
orderId, type, keysToDelete.size());
}
}
/**
* 删除缓存(使用Redisson实现分布式锁,防止并发问题)
*/
private void deleteCache(Set<String> keys) {
if (CollUtil.isEmpty(keys)) {
return;
}
for (String key : keys) {
try {
// 获取分布式锁,防止并发更新
RLock lock = redissonClient.getLock("lock:cache:" + key);
if (lock.tryLock(3, 5, TimeUnit.SECONDS)) {
try {
redisTemplate.delete(key);
log.debug("🗑️ 删除缓存: {}", key);
} finally {
lock.unlock();
}
} else {
log.warn("⚠️ 获取缓存锁失败: {}", key);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("❌ 删除缓存失败: {}", key, e);
}
}
}
}
/**
* 用户缓存同步处理器
*/
@Component
@Slf4j
public class UserCacheSyncHandler implements CacheSyncHandler {
@Autowired
private StringRedisTemplate redisTemplate;
@Override
public void handle(String database, String table, String type,
List<Map<String, Object>> data, List<Map<String, Object>> old) {
for (Map<String, Object> row : data) {
Long userId = Long.valueOf(row.get("id").toString());
// 删除用户相关缓存
Set<String> keys = new HashSet<>();
keys.add("user:detail:" + userId);
keys.add("user:profile:" + userId);
keys.add("user:info:" + userId);
// 使用Pipeline批量删除
redisTemplate.delete(keys);
log.info("✅ 用户缓存同步完成: userId={}, type={}", userId, type);
}
}
}
3.4 延迟双删策略
/**
* 延迟双删工具类
* 解决数据库与缓存不一致问题
*/
@Component
@Slf4j
public class DelayDoubleDeleteUtil {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private RedissonClient redissonClient;
@Autowired
private ThreadPoolExecutor delayDeleteExecutor;
/**
* 执行延迟双删
*
* 流程:
* 1. 先删缓存
* 2. 更新数据库(由业务代码完成)
* 3. 延迟后再次删除缓存
*
* 为什么要延迟?
* - 防止在删除缓存和更新数据库之间,有其他线程读取旧数据并写入缓存
*/
public void doubleDelete(String key, long delayMillis) {
// 第一次删除
redisTemplate.delete(key);
log.debug("🗑️ 第一次删除缓存: {}", key);
// 延迟后第二次删除
delayDeleteExecutor.execute(() -> {
try {
Thread.sleep(delayMillis);
// 获取分布式锁,确保删除时没有其他线程在写缓存
RLock lock = redissonClient.getLock("lock:cache:" + key);
if (lock.tryLock(1, 3, TimeUnit.SECONDS)) {
try {
redisTemplate.delete(key);
log.debug("🗑️ 第二次删除缓存: {}", key);
} finally {
lock.unlock();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("❌ 延迟删除缓存失败: {}", key, e);
}
});
}
/**
* 批量延迟双删
*/
public void batchDoubleDelete(Set<String> keys, long delayMillis) {
for (String key : keys) {
doubleDelete(key, delayMillis);
}
}
}
3.5 缓存Key构建器
/**
* 缓存Key构建器
* 统一管理缓存Key的生成规则
*/
@Component
public class CacheKeyBuilder {
/**
* 订单详情缓存Key
*/
public String orderDetail(Long orderId) {
return "order:detail:" + orderId;
}
/**
* 用户订单列表缓存Key
*/
public String userOrderList(Long userId, int page, int size) {
return "order:user:list:" + userId + ":" + page + ":" + size;
}
/**
* 用户订单数量缓存Key
*/
public String userOrderCount(Long userId) {
return "order:user:count:" + userId;
}
/**
* 商品详情缓存Key
*/
public String productDetail(Long productId) {
return "product:detail:" + productId;
}
/**
* 用户详情缓存Key
*/
public String userDetail(Long userId) {
return "user:detail:" + userId;
}
/**
* 根据表名和数据构建需要删除的缓存Key
*/
public Set<String> buildKeysToDelete(String table, Map<String, Object> data) {
Set<String> keys = new HashSet<>();
switch (table) {
case "t_order":
Long orderId = Long.valueOf(data.get("id").toString());
Long userId = Long.valueOf(data.get("user_id").toString());
keys.add(orderDetail(orderId));
keys.add(userOrderList(userId, 1, 20)); // 删除第一页缓存
keys.add(userOrderCount(userId));
break;
case "t_user":
Long uid = Long.valueOf(data.get("id").toString());
keys.add(userDetail(uid));
break;
case "t_product":
Long productId = Long.valueOf(data.get("id").toString());
keys.add(productDetail(productId));
break;
}
return keys;
}
}
四、高级进阶:多级缓存同步
4.1 本地缓存+Redis缓存同步
/**
* 多级缓存同步服务
* 同步更新本地缓存(Caffeine)和Redis缓存
*/
@Component
@Slf4j
public class MultiLevelCacheSyncService {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private RedissonClient redissonClient;
// 本地缓存管理器(由各个Service持有)
private final Map<String, Cache<String, Object>> localCaches = new ConcurrentHashMap<>();
/**
* 注册本地缓存
*/
public void registerLocalCache(String cacheName, Cache<String, Object> cache) {
localCaches.put(cacheName, cache);
log.info("✅ 注册本地缓存: {}", cacheName);
}
/**
* 同步删除多级缓存
*/
public void deleteMultiLevel(String cacheName, String key) {
// 1. 删除Redis缓存
redisTemplate.delete(key);
// 2. 删除本地缓存
Cache<String, Object> localCache = localCaches.get(cacheName);
if (localCache != null) {
localCache.invalidate(key);
}
// 3. 发送消息通知其他节点删除本地缓存
CacheInvalidateMessage message = new CacheInvalidateMessage();
message.setCacheName(cacheName);
message.setKey(key);
message.setTimestamp(System.currentTimeMillis());
redisTemplate.convertAndSend("cache:invalidate", JSON.toJSONString(message));
log.debug("🗑️ 多级缓存删除: cacheName={}, key={}", cacheName, key);
}
/**
* 监听本地缓存失效消息(其他节点通知)
*/
@RedisListener(channel = "cache:invalidate")
public void onCacheInvalidate(String message) {
CacheInvalidateMessage invalidateMessage = JSON.parseObject(message, CacheInvalidateMessage.class);
String cacheName = invalidateMessage.getCacheName();
String key = invalidateMessage.getKey();
// 删除本地缓存(不删Redis,因为Redis已经由发送方删除)
Cache<String, Object> localCache = localCaches.get(cacheName);
if (localCache != null) {
localCache.invalidate(key);
log.debug("🗑️ 收到缓存失效通知,删除本地缓存: cacheName={}, key={}", cacheName, key);
}
}
}
4.2 消息幂等性处理
/**
* Canal消息幂等性处理
* 防止重复消费导致的不必要缓存删除
*/
@Component
@Slf4j
public class CanalMessageIdempotencyHandler {
@Autowired
private StringRedisTemplate redisTemplate;
private static final String IDEMPOTENCY_KEY_PREFIX = "canal:msg:";
private static final long IDEMPOTENCY_EXPIRE = 3600; // 1小时
/**
* 检查消息是否已处理
*/
public boolean isProcessed(CanalMessage message) {
String messageId = generateMessageId(message);
String key = IDEMPOTENCY_KEY_PREFIX + messageId;
Boolean exists = redisTemplate.hasKey(key);
return Boolean.TRUE.equals(exists);
}
/**
* 标记消息已处理
*/
public void markProcessed(CanalMessage message) {
String messageId = generateMessageId(message);
String key = IDEMPOTENCY_KEY_PREFIX + messageId;
redisTemplate.opsForValue().set(key, "1", IDEMPOTENCY_EXPIRE, TimeUnit.SECONDS);
}
/**
* 生成消息唯一ID
*/
private String generateMessageId(CanalMessage message) {
// 使用数据库+表+主键+时间戳生成唯一ID
StringBuilder sb = new StringBuilder();
sb.append(message.getDatabase()).append(":");
sb.append(message.getTable()).append(":");
if (CollUtil.isNotEmpty(message.getData())) {
Map<String, Object> firstRow = message.getData().get(0);
sb.append(firstRow.get("id")).append(":");
}
sb.append(message.getTs());
return DigestUtils.md5DigestAsHex(sb.toString().getBytes());
}
}
4.3 缓存同步监控
/**
* 缓存同步监控服务
*/
@Component
@Slf4j
public class CacheSyncMonitor {
@Autowired
private MeterRegistry meterRegistry;
private final Counter syncCounter;
private final Counter syncErrorCounter;
private final Timer syncTimer;
public CacheSyncMonitor() {
this.syncCounter = Counter.builder("cache.sync.total")
.description("Total cache sync operations")
.register(meterRegistry);
this.syncErrorCounter = Counter.builder("cache.sync.errors")
.description("Total cache sync errors")
.register(meterRegistry);
this.syncTimer = Timer.builder("cache.sync.duration")
.description("Cache sync operation duration")
.register(meterRegistry);
}
/**
* 记录同步成功
*/
public void recordSuccess(String table, int count) {
syncCounter.increment(count);
log.debug("📊 缓存同步成功: table={}, count={}", table, count);
}
/**
* 记录同步失败
*/
public void recordError(String table, String error) {
syncErrorCounter.increment();
log.error("📊 缓存同步失败: table={}, error={}", table, error);
}
/**
* 记录同步耗时
*/
public void recordDuration(String table, long millis) {
syncTimer.record(millis, TimeUnit.MILLISECONDS);
}
}
五、预判问题与解答
Q1:Canal同步有延迟吗?延迟多久?
A:Canal同步有一定延迟,但通常在毫秒级:
| 环节 | 典型延迟 | 说明 |
|---|---|---|
| Binlog生成 | 0ms | 数据库事务提交时生成 |
| Canal拉取 | 1-10ms | 网络传输+解析 |
| MQ发送 | 1-5ms | 消息队列写入 |
| 消费者处理 | 5-50ms | 缓存删除操作 |
| 总延迟 | 10-100ms | 绝大多数场景可接受 |
对于强一致性场景,建议业务代码直接删除缓存,Canal作为兜底。
Q2:Canal挂了怎么办?缓存会不会永久不一致?
A:需要多层保障:
1. Canal高可用部署:
- Canal Server集群部署
- 自动故障转移
2. 监控告警:
- Canal延迟监控
- 超过阈值发送告警
3. 兜底策略:
- 业务代码直接删缓存
- 定时任务全量比对
- 缓存设置过期时间
4. 数据对账:
- 定时比对数据库和缓存
- 发现不一致自动修复
Q3:消息堆积怎么办?
A:采用批量消费+分区消费策略:
// Kafka批量消费配置
@KafkaListener(topics = "canal_cache_sync",
containerFactory = "batchKafkaListenerContainerFactory")
public void onBatchMessage(List<ConsumerRecord<String, String>> records) {
// 按表名分组,批量处理
Map<String, List<CanalMessage>> groupedMessages = records.stream()
.map(r -> JSON.parseObject(r.value(), CanalMessage.class))
.collect(Collectors.groupingBy(CanalMessage::getTable));
groupedMessages.forEach((table, messages) -> {
// 合并同一Key的多次变更,只处理最后一次
Set<String> keysToDelete = mergeAndDeduplicate(messages);
// 批量删除缓存(Pipeline)
redisTemplate.delete(keysToDelete);
log.info("🔄 批量处理: table={}, messages={}, uniqueKeys={}",
table, messages.size(), keysToDelete.size());
});
}
Q4:如何确保消息不丢失?
A:需要保证Canal和MQ的可靠性:
Canal端:
- 开启Binlog持久化
- Canal定期记录消费位点
- 断线后从上次位点继续消费
MQ端:
- Kafka:设置acks=all,开启副本
- RocketMQ:同步刷盘,同步复制
消费端:
- 手动提交Offset
- 处理失败不提交,重试消费
- 超过重试次数进入死信队列
Q5:业务代码还需要操作缓存吗?
A:建议采用双写策略:
@Transactional
public void updateOrder(OrderDTO dto) {
// 1. 更新数据库
orderMapper.update(dto);
// 2. 直接删除缓存(强一致性保障)
redisTemplate.delete("order:detail:" + dto.getId());
// 3. Canal会再次删除(兜底,处理步骤2失败的情况)
}
优点:
- 业务代码直接删缓存:保证强一致性
- Canal兜底:处理业务代码删缓存失败的情况
- 即使Canal延迟,业务代码已保证一致性
六、面试高频考点
考点1:Canal的工作原理是什么?
参考答案:
Canal工作原理:
1. 伪装MySQL从库:
- Canal Server模拟MySQL Slave的协议
- 向MySQL Master发送dump请求
2. 拉取Binlog:
- MySQL Master将Binlog发送给Canal
- Canal解析Binlog为结构化数据
3. 事件分发:
- 将解析后的事件发送到MQ(Kafka/RocketMQ)
- 或直接提供给客户端消费
4. 客户端消费:
- 业务系统消费MQ消息
- 根据数据变更同步缓存
关键点:
- 基于MySQL主从复制协议
- 对业务零侵入
- 支持断点续传
考点2:缓存一致性方案有哪些?各自的优缺点?
参考答案:
| 方案 | 一致性 | 复杂度 | 性能 | 适用场景 |
|---|---|---|---|---|
| Cache Aside | 最终一致 | 低 | 高 | 大多数场景 |
| Read/Write Through | 强一致 | 中 | 中 | 简单CRUD |
| Write Behind | 最终一致 | 高 | 极高 | 高并发写入 |
| Canal+MQ | 最终一致 | 高 | 高 | 复杂业务,零侵入 |
考点3:延迟双删为什么要延迟?延迟多久?
参考答案:
为什么要延迟?
场景:
T1: 线程A删除缓存
T2: 线程B查询缓存未命中,查询数据库得到旧值
T3: 线程A更新数据库
T4: 线程B将旧值写入缓存
→ 缓存中是旧值,不一致!
延迟双删:
T1: 线程A删除缓存
T2: 线程A更新数据库
T3: 线程A等待一段时间(让线程B完成)
T4: 线程A再次删除缓存
→ 即使线程B写了旧值,也被删除
延迟时间:
- 一般设置 500ms - 1s
- 需要大于"查询数据库+写入缓存"的时间
- 可根据实际业务调整
考点4:如何处理缓存同步的并发问题?
参考答案:
1. 分布式锁:
- 删除缓存前获取分布式锁
- 防止多个消费者同时操作同一Key
2. 消息分区:
- 按数据主键Hash分区
- 同一Key的变更由同一消费者处理
3. 版本号机制:
- 缓存中存储数据版本号
- 只更新版本号大于当前缓存的消息
4. 合并变更:
- 批量消费时合并同一Key的多次变更
- 只处理最后一次变更
七、总结与最佳实践
7.1 核心要点回顾
Canal+MQ缓存同步核心流程:
┌─────────────────────────────────────────────────────────────┐
│ 1. Canal Server配置 │
│ ├── 伪装MySQL从库,拉取Binlog │
│ ├── 配置订阅的表(支持正则) │
│ └── 将变更事件发送到Kafka/RocketMQ │
│ │
│ 2. 消息消费者 │
│ ├── 监听MQ消息 │
│ ├── 幂等性检查(防止重复消费) │
│ └── 根据表名路由到对应处理器 │
│ │
│ 3. 缓存同步处理器 │
│ ├── 构建需要删除的缓存Key列表 │
│ ├── 获取分布式锁防止并发 │
│ └── 执行缓存删除(延迟双删) │
│ │
│ 4. 多级缓存同步 │
│ ├── 删除Redis缓存 │
│ ├── 删除本地缓存(Caffeine) │
│ └── 通知其他节点删除本地缓存 │
│ │
│ 5. 监控与兜底 │
│ ├── 监控同步延迟和成功率 │
│ ├── 业务代码直接删缓存(强一致) │
│ └── 定时任务全量比对修复 │
└─────────────────────────────────────────────────────────────┘
7.2 最佳实践 checklist
- Canal Server高可用部署(集群+自动故障转移)
- MQ配置高可靠(acks=all,同步刷盘)
- 消费者配置手动提交Offset
- 实现消息幂等性处理
- 配置监控告警(延迟、堆积、错误率)
- 业务代码直接删缓存 + Canal兜底
- 缓存设置合理的过期时间
- 定时任务全量比对修复不一致数据
7.3 性能提升数据
某电商平台实测数据:
| 指标 | 业务代码更新缓存 | Canal+MQ同步 | 提升 |
|---|---|---|---|
| 代码侵入度 | 每个接口都要写 | 零侵入 | 100%↓ |
| 缓存遗漏率 | 5% | 0.1% | 98%↓ |
| 开发效率 | 低 | 高 | 大幅提升 |
| 维护成本 | 高 | 低 | 大幅降低 |
八、参考与拓展
互动讨论:你在项目中是如何保证缓存一致性的?有没有更好的方案?欢迎在评论区分享!
如果本文对你有帮助,欢迎点赞👍、收藏⭐、关注🔔,持续获取更多Java后端技术干货!
openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构
更多推荐
所有评论(0)