用户修改了订单地址,但缓存还是旧数据,导致物流发错地方——这就是缓存不一致。本文将揭秘大厂如何用Canal监听MySQL Binlog,配合消息队列实现数据库与缓存的最终一致,让业务代码零侵入。

在这里插入图片描述


一、场景引入:一次因缓存不一致导致的客诉

1.1 真实案例

某电商平台,用户修改收货地址后:

时间线:
T+0分:用户在APP修改收货地址(北京→上海)
T+1分:数据库更新成功,返回修改成功
T+2分:用户查看订单详情,显示的还是北京(缓存未更新)
T+5分:用户再次修改,系统提示"地址已更新"
T+30分:订单发货,包裹发往北京
T+3天:用户投诉,包裹发错地址

问题根源:业务代码更新了数据库,但忘记更新缓存(或更新缓存失败),导致数据不一致。

1.2 传统缓存更新方式的问题

// 传统方式:业务代码中手动更新缓存
@Transactional
public void updateOrderAddress(Long orderId, String newAddress) {
    // 1. 更新数据库
    orderMapper.updateAddress(orderId, newAddress);
    
    // 2. 更新缓存(问题在这里!)
    // 问题1:如果更新缓存失败,数据不一致
    // 问题2:如果事务回滚,缓存已更新
    // 问题3:分布式环境下,多个服务都要写更新缓存的代码
    redisTemplate.delete("order:" + orderId);
}

传统方式的痛点

痛点 说明 后果
代码侵入 每个更新操作都要写缓存更新代码 代码冗余,易遗漏
事务不一致 缓存更新在事务外 事务回滚但缓存已更新
更新失败 缓存更新可能失败 数据永久不一致
并发问题 多个服务同时更新 竞态条件

二、解决方案:Canal+MQ实现最终一致

2.1 整体架构设计

┌─────────────────────────────────────────────────────────────────────┐
│                    Canal+MQ缓存同步架构                              │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│   ┌─────────────┐     ┌─────────────┐     ┌─────────────┐          │
│   │   业务服务   │     │   业务服务   │     │   业务服务   │          │
│   │   ServiceA  │     │   ServiceB  │     │   ServiceC  │          │
│   └──────┬──────┘     └──────┬──────┘     └──────┬──────┘          │
│          │                   │                   │                  │
│          └───────────────────┼───────────────────┘                  │
│                              ▼                                       │
│   ┌─────────────────────────────────────────────────────────────┐   │
│   │                      MySQL数据库                             │   │
│   │              ┌─────────────────────┐                        │   │
│   │              │     Binlog日志       │                        │   │
│   │              │  记录所有数据变更      │                        │   │
│   │              └─────────────────────┘                        │   │
│   └──────────────────────────┬──────────────────────────────────┘   │
│                              │                                       │
│                              ▼                                       │
│   ┌─────────────────────────────────────────────────────────────┐   │
│   │                      Canal Server                            │   │
│   │              ┌─────────────────────┐                        │   │
│   │              │   伪装成MySQL从库    │                        │   │
│   │              │   实时拉取Binlog     │                        │   │
│   │              │   解析数据变更事件   │                        │   │
│   │              └─────────────────────┘                        │   │
│   └──────────────────────────┬──────────────────────────────────┘   │
│                              │                                       │
│                              ▼                                       │
│   ┌─────────────────────────────────────────────────────────────┐   │
│   │                      消息队列 (MQ)                           │   │
│   │              ┌─────────────────────┐                        │   │
│   │              │   Kafka/RocketMQ    │                        │   │
│   │              │   缓存变更事件队列   │                        │   │
│   │              └─────────────────────┘                        │   │
│   └──────────────────────────┬──────────────────────────────────┘   │
│                              │                                       │
│              ┌───────────────┼───────────────┐                      │
│              ▼               ▼               ▼                      │
│   ┌─────────────┐   ┌─────────────┐   ┌─────────────┐              │
│   │  缓存消费者  │   │  缓存消费者  │   │  缓存消费者  │              │
│   │  Consumer1  │   │  Consumer2  │   │  Consumer3  │              │
│   └──────┬──────┘   └──────┬──────┘   └──────┬──────┘              │
│          └───────────────────┼───────────────────┘                  │
│                              ▼                                       │
│   ┌─────────────────────────────────────────────────────────────┐   │
│   │                      Redis缓存层                             │   │
│   │              ┌─────────────────────┐                        │   │
│   │              │   删除/更新缓存      │                        │   │
│   │              │   保证最终一致性     │                        │   │
│   │              └─────────────────────┘                        │   │
│   └─────────────────────────────────────────────────────────────┘   │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘

2.2 核心思路

  1. Canal监听Binlog:伪装成MySQL从库,实时获取数据变更
  2. MQ异步解耦:将变更事件发送到消息队列,削峰填谷
  3. 消费者更新缓存:消费消息,删除/更新Redis缓存
  4. 业务零侵入:业务代码只操作数据库,不感知缓存

三、实战代码:从零实现Canal+MQ缓存同步

3.1 Canal配置与部署

Canal Server配置
# canal.properties
# Canal Server基础配置
canal.ip = 127.0.0.1
canal.port = 11111

# Kafka配置(将Binlog事件发送到Kafka)
canal.serverMode = kafka
canal.mq.servers = kafka:9092
canal.mq.retries = 3
canal.mq.batchSize = 16384
canal.mq.lingerMs = 100
canal.mq.bufferMemory = 33554432
# instance.properties
# 数据库连接配置
canal.instance.master.address = mysql:3306
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.connectionCharset = UTF-8

# 订阅的表(支持正则)
canal.instance.filter.regex = db_order\\.t_order,db_user\\.t_user,db_product\\.t_product

# Kafka Topic配置
canal.mq.topic = canal_cache_sync
canal.mq.partition = 3
canal.mq.partitionHash = db_order\\.t_order:id,db_user\\.t_user:id

3.2 Canal消息消费者

/**
 * Canal消息消费者
 * 监听Kafka中的Binlog变更事件,同步更新缓存
 */
@Component
@Slf4j
public class CanalCacheSyncConsumer {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    @Autowired
    private CacheKeyBuilder cacheKeyBuilder;
    
    /**
     * 消费Canal消息
     */
    @KafkaListener(topics = "canal_cache_sync", groupId = "cache-sync-group")
    public void onCanalMessage(ConsumerRecord<String, String> record) {
        String message = record.value();
        
        try {
            CanalMessage canalMessage = JSON.parseObject(message, CanalMessage.class);
            
            String database = canalMessage.getDatabase();
            String table = canalMessage.getTable();
            String type = canalMessage.getType(); // INSERT/UPDATE/DELETE
            List<Map<String, Object>> data = canalMessage.getData();
            List<Map<String, Object>> old = canalMessage.getOld();
            
            log.info("🔄 收到Canal消息: {}.{} {} 数据条数: {}", 
                database, table, type, data.size());
            
            // 根据表名路由到对应的处理器
            CacheSyncHandler handler = getHandler(table);
            if (handler != null) {
                handler.handle(database, table, type, data, old);
            } else {
                log.warn("⚠️ 未找到表 {} 的缓存同步处理器", table);
            }
            
        } catch (Exception e) {
            log.error("❌ Canal消息处理失败: {}", message, e);
            // 可以选择发送到死信队列
        }
    }
    
    /**
     * 获取缓存同步处理器
     */
    private CacheSyncHandler getHandler(String table) {
        switch (table) {
            case "t_order":
                return new OrderCacheSyncHandler();
            case "t_user":
                return new UserCacheSyncHandler();
            case "t_product":
                return new ProductCacheSyncHandler();
            default:
                return null;
        }
    }
}

3.3 缓存同步处理器

/**
 * 缓存同步处理器接口
 */
public interface CacheSyncHandler {
    void handle(String database, String table, String type, 
                List<Map<String, Object>> data, List<Map<String, Object>> old);
}

/**
 * 订单缓存同步处理器
 */
@Component
@Slf4j
public class OrderCacheSyncHandler implements CacheSyncHandler {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    @Autowired
    private RedissonClient redissonClient;
    
    @Override
    public void handle(String database, String table, String type, 
                       List<Map<String, Object>> data, List<Map<String, Object>> old) {
        
        for (Map<String, Object> row : data) {
            Long orderId = Long.valueOf(row.get("id").toString());
            Long userId = Long.valueOf(row.get("user_id").toString());
            
            // 构建需要删除的缓存Key列表
            Set<String> keysToDelete = new HashSet<>();
            
            // 1. 订单详情缓存
            keysToDelete.add("order:detail:" + orderId);
            
            // 2. 用户订单列表缓存
            keysToDelete.add("order:user:list:" + userId);
            
            // 3. 订单统计缓存
            keysToDelete.add("order:user:count:" + userId);
            
            // 4. 如果是状态变更,删除状态相关缓存
            if ("UPDATE".equals(type) && old != null) {
                for (Map<String, Object> oldRow : old) {
                    if (oldRow.containsKey("status")) {
                        String oldStatus = oldRow.get("status").toString();
                        String newStatus = row.get("status").toString();
                        
                        if (!oldStatus.equals(newStatus)) {
                            keysToDelete.add("order:status:" + oldStatus + ":" + userId);
                            keysToDelete.add("order:status:" + newStatus + ":" + userId);
                        }
                    }
                }
            }
            
            // 执行缓存删除
            deleteCache(keysToDelete);
            
            log.info("✅ 订单缓存同步完成: orderId={}, type={}, 删除缓存 {} 个", 
                orderId, type, keysToDelete.size());
        }
    }
    
    /**
     * 删除缓存(使用Redisson实现分布式锁,防止并发问题)
     */
    private void deleteCache(Set<String> keys) {
        if (CollUtil.isEmpty(keys)) {
            return;
        }
        
        for (String key : keys) {
            try {
                // 获取分布式锁,防止并发更新
                RLock lock = redissonClient.getLock("lock:cache:" + key);
                
                if (lock.tryLock(3, 5, TimeUnit.SECONDS)) {
                    try {
                        redisTemplate.delete(key);
                        log.debug("🗑️ 删除缓存: {}", key);
                    } finally {
                        lock.unlock();
                    }
                } else {
                    log.warn("⚠️ 获取缓存锁失败: {}", key);
                }
                
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                log.error("❌ 删除缓存失败: {}", key, e);
            }
        }
    }
}

/**
 * 用户缓存同步处理器
 */
@Component
@Slf4j
public class UserCacheSyncHandler implements CacheSyncHandler {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    @Override
    public void handle(String database, String table, String type, 
                       List<Map<String, Object>> data, List<Map<String, Object>> old) {
        
        for (Map<String, Object> row : data) {
            Long userId = Long.valueOf(row.get("id").toString());
            
            // 删除用户相关缓存
            Set<String> keys = new HashSet<>();
            keys.add("user:detail:" + userId);
            keys.add("user:profile:" + userId);
            keys.add("user:info:" + userId);
            
            // 使用Pipeline批量删除
            redisTemplate.delete(keys);
            
            log.info("✅ 用户缓存同步完成: userId={}, type={}", userId, type);
        }
    }
}

3.4 延迟双删策略

/**
 * 延迟双删工具类
 * 解决数据库与缓存不一致问题
 */
@Component
@Slf4j
public class DelayDoubleDeleteUtil {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    @Autowired
    private RedissonClient redissonClient;
    
    @Autowired
    private ThreadPoolExecutor delayDeleteExecutor;
    
    /**
     * 执行延迟双删
     * 
     * 流程:
     * 1. 先删缓存
     * 2. 更新数据库(由业务代码完成)
     * 3. 延迟后再次删除缓存
     * 
     * 为什么要延迟?
     * - 防止在删除缓存和更新数据库之间,有其他线程读取旧数据并写入缓存
     */
    public void doubleDelete(String key, long delayMillis) {
        // 第一次删除
        redisTemplate.delete(key);
        log.debug("🗑️ 第一次删除缓存: {}", key);
        
        // 延迟后第二次删除
        delayDeleteExecutor.execute(() -> {
            try {
                Thread.sleep(delayMillis);
                
                // 获取分布式锁,确保删除时没有其他线程在写缓存
                RLock lock = redissonClient.getLock("lock:cache:" + key);
                
                if (lock.tryLock(1, 3, TimeUnit.SECONDS)) {
                    try {
                        redisTemplate.delete(key);
                        log.debug("🗑️ 第二次删除缓存: {}", key);
                    } finally {
                        lock.unlock();
                    }
                }
                
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                log.error("❌ 延迟删除缓存失败: {}", key, e);
            }
        });
    }
    
    /**
     * 批量延迟双删
     */
    public void batchDoubleDelete(Set<String> keys, long delayMillis) {
        for (String key : keys) {
            doubleDelete(key, delayMillis);
        }
    }
}

3.5 缓存Key构建器

/**
 * 缓存Key构建器
 * 统一管理缓存Key的生成规则
 */
@Component
public class CacheKeyBuilder {
    
    /**
     * 订单详情缓存Key
     */
    public String orderDetail(Long orderId) {
        return "order:detail:" + orderId;
    }
    
    /**
     * 用户订单列表缓存Key
     */
    public String userOrderList(Long userId, int page, int size) {
        return "order:user:list:" + userId + ":" + page + ":" + size;
    }
    
    /**
     * 用户订单数量缓存Key
     */
    public String userOrderCount(Long userId) {
        return "order:user:count:" + userId;
    }
    
    /**
     * 商品详情缓存Key
     */
    public String productDetail(Long productId) {
        return "product:detail:" + productId;
    }
    
    /**
     * 用户详情缓存Key
     */
    public String userDetail(Long userId) {
        return "user:detail:" + userId;
    }
    
    /**
     * 根据表名和数据构建需要删除的缓存Key
     */
    public Set<String> buildKeysToDelete(String table, Map<String, Object> data) {
        Set<String> keys = new HashSet<>();
        
        switch (table) {
            case "t_order":
                Long orderId = Long.valueOf(data.get("id").toString());
                Long userId = Long.valueOf(data.get("user_id").toString());
                keys.add(orderDetail(orderId));
                keys.add(userOrderList(userId, 1, 20)); // 删除第一页缓存
                keys.add(userOrderCount(userId));
                break;
                
            case "t_user":
                Long uid = Long.valueOf(data.get("id").toString());
                keys.add(userDetail(uid));
                break;
                
            case "t_product":
                Long productId = Long.valueOf(data.get("id").toString());
                keys.add(productDetail(productId));
                break;
        }
        
        return keys;
    }
}

四、高级进阶:多级缓存同步

4.1 本地缓存+Redis缓存同步

/**
 * 多级缓存同步服务
 * 同步更新本地缓存(Caffeine)和Redis缓存
 */
@Component
@Slf4j
public class MultiLevelCacheSyncService {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    @Autowired
    private RedissonClient redissonClient;
    
    // 本地缓存管理器(由各个Service持有)
    private final Map<String, Cache<String, Object>> localCaches = new ConcurrentHashMap<>();
    
    /**
     * 注册本地缓存
     */
    public void registerLocalCache(String cacheName, Cache<String, Object> cache) {
        localCaches.put(cacheName, cache);
        log.info("✅ 注册本地缓存: {}", cacheName);
    }
    
    /**
     * 同步删除多级缓存
     */
    public void deleteMultiLevel(String cacheName, String key) {
        // 1. 删除Redis缓存
        redisTemplate.delete(key);
        
        // 2. 删除本地缓存
        Cache<String, Object> localCache = localCaches.get(cacheName);
        if (localCache != null) {
            localCache.invalidate(key);
        }
        
        // 3. 发送消息通知其他节点删除本地缓存
        CacheInvalidateMessage message = new CacheInvalidateMessage();
        message.setCacheName(cacheName);
        message.setKey(key);
        message.setTimestamp(System.currentTimeMillis());
        
        redisTemplate.convertAndSend("cache:invalidate", JSON.toJSONString(message));
        
        log.debug("🗑️ 多级缓存删除: cacheName={}, key={}", cacheName, key);
    }
    
    /**
     * 监听本地缓存失效消息(其他节点通知)
     */
    @RedisListener(channel = "cache:invalidate")
    public void onCacheInvalidate(String message) {
        CacheInvalidateMessage invalidateMessage = JSON.parseObject(message, CacheInvalidateMessage.class);
        
        String cacheName = invalidateMessage.getCacheName();
        String key = invalidateMessage.getKey();
        
        // 删除本地缓存(不删Redis,因为Redis已经由发送方删除)
        Cache<String, Object> localCache = localCaches.get(cacheName);
        if (localCache != null) {
            localCache.invalidate(key);
            log.debug("🗑️ 收到缓存失效通知,删除本地缓存: cacheName={}, key={}", cacheName, key);
        }
    }
}

4.2 消息幂等性处理

/**
 * Canal消息幂等性处理
 * 防止重复消费导致的不必要缓存删除
 */
@Component
@Slf4j
public class CanalMessageIdempotencyHandler {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    private static final String IDEMPOTENCY_KEY_PREFIX = "canal:msg:";
    private static final long IDEMPOTENCY_EXPIRE = 3600; // 1小时
    
    /**
     * 检查消息是否已处理
     */
    public boolean isProcessed(CanalMessage message) {
        String messageId = generateMessageId(message);
        String key = IDEMPOTENCY_KEY_PREFIX + messageId;
        
        Boolean exists = redisTemplate.hasKey(key);
        return Boolean.TRUE.equals(exists);
    }
    
    /**
     * 标记消息已处理
     */
    public void markProcessed(CanalMessage message) {
        String messageId = generateMessageId(message);
        String key = IDEMPOTENCY_KEY_PREFIX + messageId;
        
        redisTemplate.opsForValue().set(key, "1", IDEMPOTENCY_EXPIRE, TimeUnit.SECONDS);
    }
    
    /**
     * 生成消息唯一ID
     */
    private String generateMessageId(CanalMessage message) {
        // 使用数据库+表+主键+时间戳生成唯一ID
        StringBuilder sb = new StringBuilder();
        sb.append(message.getDatabase()).append(":");
        sb.append(message.getTable()).append(":");
        
        if (CollUtil.isNotEmpty(message.getData())) {
            Map<String, Object> firstRow = message.getData().get(0);
            sb.append(firstRow.get("id")).append(":");
        }
        
        sb.append(message.getTs());
        
        return DigestUtils.md5DigestAsHex(sb.toString().getBytes());
    }
}

4.3 缓存同步监控

/**
 * 缓存同步监控服务
 */
@Component
@Slf4j
public class CacheSyncMonitor {
    
    @Autowired
    private MeterRegistry meterRegistry;
    
    private final Counter syncCounter;
    private final Counter syncErrorCounter;
    private final Timer syncTimer;
    
    public CacheSyncMonitor() {
        this.syncCounter = Counter.builder("cache.sync.total")
            .description("Total cache sync operations")
            .register(meterRegistry);
        
        this.syncErrorCounter = Counter.builder("cache.sync.errors")
            .description("Total cache sync errors")
            .register(meterRegistry);
        
        this.syncTimer = Timer.builder("cache.sync.duration")
            .description("Cache sync operation duration")
            .register(meterRegistry);
    }
    
    /**
     * 记录同步成功
     */
    public void recordSuccess(String table, int count) {
        syncCounter.increment(count);
        log.debug("📊 缓存同步成功: table={}, count={}", table, count);
    }
    
    /**
     * 记录同步失败
     */
    public void recordError(String table, String error) {
        syncErrorCounter.increment();
        log.error("📊 缓存同步失败: table={}, error={}", table, error);
    }
    
    /**
     * 记录同步耗时
     */
    public void recordDuration(String table, long millis) {
        syncTimer.record(millis, TimeUnit.MILLISECONDS);
    }
}

五、预判问题与解答

Q1:Canal同步有延迟吗?延迟多久?

A:Canal同步有一定延迟,但通常在毫秒级

环节 典型延迟 说明
Binlog生成 0ms 数据库事务提交时生成
Canal拉取 1-10ms 网络传输+解析
MQ发送 1-5ms 消息队列写入
消费者处理 5-50ms 缓存删除操作
总延迟 10-100ms 绝大多数场景可接受

对于强一致性场景,建议业务代码直接删除缓存,Canal作为兜底。

Q2:Canal挂了怎么办?缓存会不会永久不一致?

A:需要多层保障:

1. Canal高可用部署:
   - Canal Server集群部署
   - 自动故障转移

2. 监控告警:
   - Canal延迟监控
   - 超过阈值发送告警

3. 兜底策略:
   - 业务代码直接删缓存
   - 定时任务全量比对
   - 缓存设置过期时间

4. 数据对账:
   - 定时比对数据库和缓存
   - 发现不一致自动修复

Q3:消息堆积怎么办?

A:采用批量消费+分区消费策略:

// Kafka批量消费配置
@KafkaListener(topics = "canal_cache_sync", 
    containerFactory = "batchKafkaListenerContainerFactory")
public void onBatchMessage(List<ConsumerRecord<String, String>> records) {
    
    // 按表名分组,批量处理
    Map<String, List<CanalMessage>> groupedMessages = records.stream()
        .map(r -> JSON.parseObject(r.value(), CanalMessage.class))
        .collect(Collectors.groupingBy(CanalMessage::getTable));
    
    groupedMessages.forEach((table, messages) -> {
        // 合并同一Key的多次变更,只处理最后一次
        Set<String> keysToDelete = mergeAndDeduplicate(messages);
        
        // 批量删除缓存(Pipeline)
        redisTemplate.delete(keysToDelete);
        
        log.info("🔄 批量处理: table={}, messages={}, uniqueKeys={}", 
            table, messages.size(), keysToDelete.size());
    });
}

Q4:如何确保消息不丢失?

A:需要保证Canal和MQ的可靠性

Canal端:
- 开启Binlog持久化
- Canal定期记录消费位点
- 断线后从上次位点继续消费

MQ端:
- Kafka:设置acks=all,开启副本
- RocketMQ:同步刷盘,同步复制

消费端:
- 手动提交Offset
- 处理失败不提交,重试消费
- 超过重试次数进入死信队列

Q5:业务代码还需要操作缓存吗?

A:建议采用双写策略

@Transactional
public void updateOrder(OrderDTO dto) {
    // 1. 更新数据库
    orderMapper.update(dto);
    
    // 2. 直接删除缓存(强一致性保障)
    redisTemplate.delete("order:detail:" + dto.getId());
    
    // 3. Canal会再次删除(兜底,处理步骤2失败的情况)
}

优点

  • 业务代码直接删缓存:保证强一致性
  • Canal兜底:处理业务代码删缓存失败的情况
  • 即使Canal延迟,业务代码已保证一致性

六、面试高频考点

考点1:Canal的工作原理是什么?

参考答案

Canal工作原理:

1. 伪装MySQL从库:
   - Canal Server模拟MySQL Slave的协议
   - 向MySQL Master发送dump请求

2. 拉取Binlog:
   - MySQL Master将Binlog发送给Canal
   - Canal解析Binlog为结构化数据

3. 事件分发:
   - 将解析后的事件发送到MQ(Kafka/RocketMQ)
   - 或直接提供给客户端消费

4. 客户端消费:
   - 业务系统消费MQ消息
   - 根据数据变更同步缓存

关键点:
- 基于MySQL主从复制协议
- 对业务零侵入
- 支持断点续传

考点2:缓存一致性方案有哪些?各自的优缺点?

参考答案

方案 一致性 复杂度 性能 适用场景
Cache Aside 最终一致 大多数场景
Read/Write Through 强一致 简单CRUD
Write Behind 最终一致 极高 高并发写入
Canal+MQ 最终一致 复杂业务,零侵入

考点3:延迟双删为什么要延迟?延迟多久?

参考答案

为什么要延迟?

场景:
T1: 线程A删除缓存
T2: 线程B查询缓存未命中,查询数据库得到旧值
T3: 线程A更新数据库
T4: 线程B将旧值写入缓存
→ 缓存中是旧值,不一致!

延迟双删:
T1: 线程A删除缓存
T2: 线程A更新数据库
T3: 线程A等待一段时间(让线程B完成)
T4: 线程A再次删除缓存
→ 即使线程B写了旧值,也被删除

延迟时间:
- 一般设置 500ms - 1s
- 需要大于"查询数据库+写入缓存"的时间
- 可根据实际业务调整

考点4:如何处理缓存同步的并发问题?

参考答案

1. 分布式锁:
   - 删除缓存前获取分布式锁
   - 防止多个消费者同时操作同一Key

2. 消息分区:
   - 按数据主键Hash分区
   - 同一Key的变更由同一消费者处理

3. 版本号机制:
   - 缓存中存储数据版本号
   - 只更新版本号大于当前缓存的消息

4. 合并变更:
   - 批量消费时合并同一Key的多次变更
   - 只处理最后一次变更

七、总结与最佳实践

7.1 核心要点回顾

Canal+MQ缓存同步核心流程:

┌─────────────────────────────────────────────────────────────┐
│  1. Canal Server配置                                          │
│     ├── 伪装MySQL从库,拉取Binlog                             │
│     ├── 配置订阅的表(支持正则)                              │
│     └── 将变更事件发送到Kafka/RocketMQ                        │
│                                                              │
│  2. 消息消费者                                                │
│     ├── 监听MQ消息                                            │
│     ├── 幂等性检查(防止重复消费)                            │
│     └── 根据表名路由到对应处理器                              │
│                                                              │
│  3. 缓存同步处理器                                            │
│     ├── 构建需要删除的缓存Key列表                             │
│     ├── 获取分布式锁防止并发                                  │
│     └── 执行缓存删除(延迟双删)                              │
│                                                              │
│  4. 多级缓存同步                                              │
│     ├── 删除Redis缓存                                         │
│     ├── 删除本地缓存(Caffeine)                              │
│     └── 通知其他节点删除本地缓存                              │
│                                                              │
│  5. 监控与兜底                                                │
│     ├── 监控同步延迟和成功率                                  │
│     ├── 业务代码直接删缓存(强一致)                          │
│     └── 定时任务全量比对修复                                  │
└─────────────────────────────────────────────────────────────┘

7.2 最佳实践 checklist

  • Canal Server高可用部署(集群+自动故障转移)
  • MQ配置高可靠(acks=all,同步刷盘)
  • 消费者配置手动提交Offset
  • 实现消息幂等性处理
  • 配置监控告警(延迟、堆积、错误率)
  • 业务代码直接删缓存 + Canal兜底
  • 缓存设置合理的过期时间
  • 定时任务全量比对修复不一致数据

7.3 性能提升数据

某电商平台实测数据:

指标 业务代码更新缓存 Canal+MQ同步 提升
代码侵入度 每个接口都要写 零侵入 100%↓
缓存遗漏率 5% 0.1% 98%↓
开发效率 大幅提升
维护成本 大幅降低

八、参考与拓展


互动讨论:你在项目中是如何保证缓存一致性的?有没有更好的方案?欢迎在评论区分享!

如果本文对你有帮助,欢迎点赞👍、收藏⭐、关注🔔,持续获取更多Java后端技术干货!

Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐