影刀RPA店群自动化缓存架构实战:Python协同多级缓存与数据一致性设计

在这里插入图片描述


每次采集商品数据都重新加载页面,每次上货都重新查询运费模板。

在这里插入图片描述

拼多多店群自动化报活动上架!

这些重复操作累积的延迟,正在悄悄吃掉你的利润。

在这里插入图片描述

在店群自动化的性能优化中,我们注意到一个现象:很多页面数据的读取是高频且重复的。
比如采集竞品价格时,同一个商品列表一天内被访问多次;上货时,运费模板ID、类目ID等配置信息几乎不变,但每次流程都会重新查询一遍。

如果能把“读多写少”的数据缓存起来,避免重复的页面加载和API调用,整体的任务执行速度将显著提升。

在这里插入图片描述

于是我们设计了一套多级缓存系统,将数据访问路径从“浏览器→网络→平台服务器”缩短到“本地内存→Redis→浏览器缓存”,把延迟从秒级降到了毫秒级。


在这里插入图片描述

TEMU店群矩阵自动化运营核价报活动

一、缓存的三个层级:本地、Redis、浏览器端存储

在这里插入图片描述

针对店群自动化的数据访问特点,我们把缓存分为三层:

  • L1 本地进程内存缓存:最快,但生命周期短,Worker重启后丢失。适合缓存业务配置、店铺基础信息等极少变化的数据。
    • L2 Redis 共享缓存:跨Worker共享,适合存储各店铺共用的平台规则、模板数据,以及需要多个Worker协同更新的数据。
    • L3 浏览器端存储(IndexedDB / LocalStorage):适合缓存当前店铺会话级的页面静态资源标识、已加载的商品列表等,减少页面重新渲染。
      一个数据请求会先查L1,未命中查L2,再未命中查L3,最后才通过浏览器发起真实网络请求。
import time
import json
from functools import wraps

class CacheLayer:
    def __init__(self, local_cache, redis_cache, browser_store=None):
            self.local = local_cache      # dict-like, 如 lru_cache
                    self.redis = redis_cache      # Redis客户端
                            self.browser = browser_store  # 通过CDP操作浏览器存储
    async def get(self, key: str, max_age=300):
            # 1. 本地内存
                    value = self.local.get(key)
                            if value and time.time() - value["ts"] < max_age:
                                        return value["data"]
        # 2. Redis
                redis_val = await self.redis.get(key)
                        if redis_val:
                                    data = json.loads(redis_val)
                                                self.local[key] = {"data": data, "ts": time.time()}
                                                            return data
        # 3. 浏览器存储(可选)
                if self.browser:
                            browser_val = await self.browser.get_item(key)
                                        if browser_val:
                                                        data = json.loads(browser_val)
                                                                        await self.redis.set(key, json.dumps(data), ex=max_age)
                                                                                        self.local[key] = {"data": data, "ts": time.time()}
                                                                                                        return data
        return None
    async def set(self, key: str, data, ttl=600):
            now = time.time()
                    self.local[key] = {"data": data, "ts": now}
                            await self.redis.set(key, json.dumps(data), ex=ttl)
                                    if self.browser:
                                                await self.browser.set_item(key, json.dumps(data))
                                                ```
`local` 使用 `collections.OrderedDict` 配合 LRU 淘汰算法,限制最大条目数,避免内存膨胀。

---

## 二、缓存什么?三类数据的缓存策略

不是所有数据都适合缓存。我们需要区分数据的类型和更新频率。

**1. 业务配置数据(强缓存,长TTL)**  
例如运费模板列表、类目树、退货地址模板等。这些数据通常由运营手动维护,变更频率极低(数周甚至数月)。  
我们可以设置较长的缓存时间(1小时甚至24小时),并在运营后台修改时主动通知缓存失效。

**2. 平台规则数据(中缓存,中TTL)**  
例如拼多多的发货时效规则、TEMU的佣金费率表。这些可能随平台政策调整,但不频繁。  
TTL设置为1-2小时,并定时从平台公告页或API校验版本。

**3. 店铺运营数据(弱缓存,短TTL)**  
例如商品列表、订单列表、竞品价格。这些数据随时变化,但短时间内重复查询是有意义的(如30秒内)。  
短TTL(30-60秒)可以有效减少同一页面重复加载。

```python
CACHE_POLICIES = {
    "config:shipping_template": {"ttl": 86400, "max_age": 86400},
        "config:category_tree": {"ttl": 86400, "max_age": 43200},
            "platform:fee_rate": {"ttl": 7200, "max_age": 3600},
                "product:list": {"ttl": 60, "max_age": 30},
                    "order:list": {"ttl": 30, "max_age": 15},
                    }
                    ```
---

## 三、浏览器端缓存:利用IndexedDB避免重复渲染

对于商品列表、订单列表这类数据量较大、且页面渲染开销高的场景,我们通过CDP操作浏览器的IndexedDB或LocalStorage,将已加载的数据持久化到浏览器本地。

下一次访问同一页面时,执行一段注入的JavaScript脚本,先检查浏览器存储中是否有缓存数据。如果有并且未过期,直接渲染缓存数据,无需等待网络请求。

```python
class BrowserCache:
    def __init__(self, cdp_session):
            self.cdp = cdp_session
    async def get_item(self, key: str):
            script = f"""
                    (async () => {{
                                const db = await window.indexedDB.open('automation_cache');
                                            // ... 读取逻辑
                                                        return cachedData;
                                                                }})()
                                                                        """
                                                                                result = await self.cdp.evaluate(script)
                                                                                        return result
    async def set_item(self, key: str, value: str):
            script = f"""
                    (async () => {{
                                const db = await window.indexedDB.open('automation_cache');
                                            // ... 写入逻辑
                                                    }})()
                                                            """
                                                                    await self.cdp.evaluate(script)
                                                                    ```
通过浏览器缓存,一个店铺的商品列表在第二次打开时几乎可以瞬间显示,影刀RPA后续的元素定位操作也变得更快。

**实际效果:竞品采集任务中,第二次扫描相同关键词时,页面加载时间从8秒降到1.5秒。**

---

## 四、缓存一致性:当数据变更时如何让缓存失效

缓存最大的挑战不是“怎么存”,而是“怎么让缓存和源数据保持一致”。

店群自动化中,很多数据变更并非通过我们的系统触发。  
例如,运营在手机端改了运费模板,或者平台下架了某个类目。这些变更我们无法捕获事件。

我们采用“主动失效 + 被动过期”的组合策略:

- **主动失效**:当自动化任务执行了写操作(如修改了运费模板),在写操作成功后,立即调用缓存失效接口删除对应的缓存键。
- - **被动过期**:每个缓存键都有TTL(过期时间),即使没有主动失效,过期后也会从源端重新加载。
- - **版本号校验**:对于关键配置,我们在Redis中存储一个版本号。缓存读取时,先比较版本号是否一致,不一致则认为缓存失效。
```python
class CacheInvalidator:
    def __init__(self, redis, cache_layer):
            self.redis = redis
                    self.cache = cache_layer
    async def invalidate(self, pattern: str):
            # 匹配删除所有相关键
                    keys = await self.redis.keys(pattern)
                            if keys:
                                        await self.redis.delete(*keys)
                                                    # 同时通知本地缓存
                                                                for key in keys:
                                                                                self.cache.local.pop(key, None)
                                                                                        logger.info(f"Invalidated cache keys matching {pattern}")
    async def check_version(self, config_type: str) -> int:
            version_key = f"version:{config_type}"
                    version = await self.redis.get(version_key)
                            return int(version) if version else 0
                            ```
例如,运费模板配置的主键前缀是 `config:shipping_template:*`。  
一旦有模板更新,就调用 `invalidate("config:shipping_template:*")` 清除所有相关缓存。

---

## 五、缓存预热:在任务高峰期前让数据就位

每天凌晨是自动化任务的低峰期,也是缓存预热的好时机。

我们编写了预热脚本,在凌晨3点遍历所有活跃店铺,提前把常用配置数据和首页数据加载到L2和L3缓存中。

```python
class CacheWarmer:
    async def warmup(self, shop_ids: list):
            for shop_id in shop_ids:
                        # 加载运费模板
                                    await self.load_and_cache(f"config:shipping_template:{shop_id}",
                                                                          fetch_shipping_templates, shop_id)
                                                                                      # 加载类目树
                                                                                                  await self.load_and_cache(f"config:category_tree:{shop_id}",
                                                                                                                                        fetch_category_tree, shop_id)
                                                                                                                                                    # 预加载首页数据
                                                                                                                                                                await self.load_and_cache(f"page:home:{shop_id}",
                                                                                                                                                                                                      fetch_home_page, shop_id)
    async def load_and_cache(self, key, fetch_func, *args):
            data = await fetch_func(*args)
                    await cache_layer.set(key, data, ttl=policies[key.split(':')[0]]['ttl'])
                    ```
预热之后,早上8点运营高峰期的任务几乎不会遇到缓存未命中,浏览器打开店铺页面的速度明显加快。

---

## 六、缓存监控与容量规划

缓存同样需要监控,否则内存溢出或命中率过低都难以察觉。

关键指标:
- 各级缓存的命中率(L1/L2/L3)
- - 缓存键数量与内存占用
- - 缓存未命中导致的额外延迟
- - 缓存失效次数(主动和被动)
Grafana面板上展示命中率曲线。当L1命中率下降时,可能是本地缓存淘汰策略过于激进,需调整容量。  
当L2命中率长期偏低时,可能缓存策略设置不合理,或数据变化过于频繁。

---

## 七、几个缓存实践中的细节

**大对象的缓存。**  
商品详情页的完整HTML不适合直接缓存,因为太大且包含时效性信息。我们选择只缓存数据层(商品标题、价格、库存),页面结构仍从浏览器加载,但减少了网络请求。

**并发写入的缓存更新。**  
多个Worker可能同时更新同一个缓存键。使用Redis的 `SETNX` 结合随机锁避免缓存击穿。

**缓存穿透防护。**  
对于不存在的数据(如查询不存在的类目ID),我们缓存一个空值标记,避免每次都穿透到后端。

---

## 八、写在最后

缓存是性能优化中最古老也最有效的手段。  
在RPA自动化场景中,我们将它分别应用到进程内存、Redis和浏览器存储三个层面,让“读”操作尽可能快地完成,让“写”操作成为唯一需要付出网络代价的动作。

当浏览器页面秒开,任务总耗时缩短一半时,你会切身体会到缓存架构带来的价值。

> 自动化不只是让机器操作得快,还要让机器等待得少。
---

*作者:林焱*
Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐