在这里插入图片描述
干爬虫这行的,谁没被封过几个IP?

我见过太多新手,上来就开多线程猛冲,结果跑了不到5分钟,IP直接进黑名单。然后就开始骂站点反爬恶心,到处找免费代理。

免费代理?呵呵。能用的没几个,能用的也慢得要死,还动不动就断。你以为你在薅羊毛,其实人家在薅你。

真正的高手,从来不是靠代理堆出来的。而是靠精准的请求频率控制,让服务器觉得你就是个正常用户。

今天这篇,我把这两年爬了不下10亿条数据总结出来的频率控制经验,全部分享给你。从最基础的随机休眠,到工业级的令牌桶算法,再到动态自适应调度。看完你会发现,原来不封IP这么简单。

为什么你的爬虫总是被封?

先搞清楚一个最基本的问题:服务器是怎么识别爬虫的?

很多人第一反应是User-Agent。错了。User-Agent只是最基础的反爬手段,现在随便一个爬虫都会换UA。

真正让你被封的,是请求行为异常

一个正常用户,浏览网页的速度是多少?

  • 打开一个页面,至少会看个3-5秒
  • 点击链接之间,会有思考时间
  • 不会连续不断地请求同一个域名
  • 一天之内的请求量是有限的

而你的爬虫呢?

  • 每秒发10个请求
  • 每个请求间隔精确到毫秒
  • 24小时不间断运行
  • 请求路径完全固定

这种行为,在服务器的反爬系统眼里,就像黑夜里的手电筒一样显眼。不封你封谁?

所以,反爬的核心,不是伪装你的身份,而是模仿人的行为

而模仿人的行为,最关键的就是控制请求的时间间隔和频率

别再只会用time.sleep(random.randint(1,3))了

我知道,很多人写爬虫,频率控制就是这么一行代码:

import time
import random

time.sleep(random.randint(1, 3))

有用吗?有点用。对付那些反爬特别弱的站点,足够了。

但稍微有点反爬能力的站点,一眼就能看穿。

为什么?因为这种随机是均匀分布的。

正常用户的浏览间隔,不是均匀分布的。而是符合正态分布的。

什么意思?

  • 大部分时候,间隔在2-5秒之间
  • 偶尔会有1秒以内的快速点击
  • 偶尔会有10秒以上的长时间停留

而random.randint(1,3)生成的间隔,每个值出现的概率是一样的。这根本就不是人的行为。

给你们看个对比图:

均匀分布随机休眠

1秒、2秒、3秒概率相同

反爬系统一眼识别

正态分布随机休眠

大部分间隔集中在2-5秒

偶尔出现短间隔和长间隔

完美模拟人类行为

那怎么实现正态分布的随机休眠?

用numpy的normal函数。

import numpy as np

def random_sleep(mean=3, std=1, min_sleep=0.5, max_sleep=10):
    """
    正态分布随机休眠
    :param mean: 平均休眠时间(秒)
    :param std: 标准差
    :param min_sleep: 最小休眠时间
    :param max_sleep: 最大休眠时间
    """
    sleep_time = np.random.normal(mean, std)
    sleep_time = max(min_sleep, min(sleep_time, max_sleep))
    time.sleep(sleep_time)

这个函数生成的休眠时间,大部分会集中在mean附近,偶尔会有偏离。完美模拟了人类的浏览行为。

我用这个函数,爬过很多反爬中等的站点,单IP连续跑一周都没被封过。

但这还不够。

对于那些反爬特别严格的站点,比如某电商、某点评,光有随机休眠还是不够的。

因为随机休眠只能控制单个请求之间的间隔,无法控制单位时间内的总请求数

比如你开了10个线程,每个线程间隔3秒,那每秒还是会有3个多请求。对于很多站点来说,这个频率还是太高了。

这时候,你需要更高级的频率控制算法。

工业级频率控制:令牌桶算法

令牌桶算法,是目前工业界最常用的流量控制算法。没有之一。

它的原理很简单:

  1. 系统以固定的速率向桶中放入令牌
  2. 每个请求需要获取一个令牌才能发送
  3. 如果桶中没有令牌,请求就会等待
  4. 桶有最大容量,超过容量的令牌会被丢弃

有令牌

无令牌

令牌生成器

以固定速率生成令牌

令牌桶

最大容量限制

请求队列

获取令牌

发送请求

等待

这个算法的好处是:

  • 可以精确控制平均请求速率
  • 允许一定程度的突发流量(桶的容量)
  • 实现简单,性能极高

用Python实现一个令牌桶也很简单:

import time
import threading

class TokenBucket:
    def __init__(self, capacity, fill_rate):
        """
        令牌桶初始化
        :param capacity: 桶的最大容量(令牌数)
        :param fill_rate: 令牌填充速率(令牌/秒)
        """
        self.capacity = capacity
        self.fill_rate = fill_rate
        self.tokens = capacity
        self.last_refill_time = time.time()
        self.lock = threading.Lock()

    def consume(self, tokens=1):
        """
        消耗令牌
        :param tokens: 需要消耗的令牌数
        :return: 等待时间(秒),如果不需要等待则返回0
        """
        with self.lock:
            now = time.time()
            # 计算需要补充的令牌数
            delta = (now - self.last_refill_time) * self.fill_rate
            self.tokens = min(self.capacity, self.tokens + delta)
            self.last_refill_time = now

            if self.tokens >= tokens:
                self.tokens -= tokens
                return 0
            else:
                # 计算需要等待的时间
                wait_time = (tokens - self.tokens) / self.fill_rate
                self.tokens = 0
                self.last_refill_time = now + wait_time
                return wait_time

    def get_token(self, tokens=1):
        """
        获取令牌,如果没有则等待
        """
        wait_time = self.consume(tokens)
        if wait_time > 0:
            time.sleep(wait_time)

怎么用?

# 创建一个令牌桶,容量10,每秒填充2个令牌
# 意味着平均每秒2个请求,最大突发10个请求
bucket = TokenBucket(capacity=10, fill_rate=2)

for url in urls:
    # 获取令牌
    bucket.get_token()
    # 发送请求
    response = requests.get(url)
    # 处理响应
    process_response(response)

就这么简单。

这个令牌桶,比你写100行随机休眠代码都管用。

我用这个实现,单IP爬某电商站点,每秒2个请求,连续跑了一个月,爬了500多万条数据,IP一次都没被封过。

进阶:动态自适应频率控制

令牌桶虽然好用,但它有个问题:速率是固定的

不同的站点,反爬强度不一样。
同一个站点,不同的时间段,反爬强度也不一样。
同一个站点,不同的页面,反爬强度还不一样。

比如,某站点的列表页,每秒允许5个请求。但详情页,每秒只允许1个请求。
又比如,白天访问量大,反爬严格。晚上访问量小,反爬宽松。

如果你用固定的速率,要么爬得太慢,要么容易被封。

这时候,你需要动态自适应频率控制

什么意思?就是让爬虫自己根据服务器的响应,自动调整请求速率。

核心思路:

  1. 监控服务器的响应状态码和响应时间
  2. 如果出现429(请求过多)或503(服务不可用),说明速率太快,降低速率
  3. 如果连续一段时间没有出现错误,说明速率可以提高
  4. 维护一个滑动窗口,记录最近的请求结果

发送请求

记录响应状态

状态码是否为429/503?

降低请求速率

连续成功次数是否达到阈值?

提高请求速率

保持当前速率

等待一段时间

实现起来也不难:

class AdaptiveRateLimiter:
    def __init__(self, initial_rate=2, min_rate=0.1, max_rate=10, 
                 backoff_factor=0.5, increase_factor=1.1,
                 success_threshold=100, window_size=1000):
        """
        自适应速率限制器
        :param initial_rate: 初始速率(请求/秒)
        :param min_rate: 最小速率
        :param max_rate: 最大速率
        :param backoff_factor: 退避系数(出错时乘以这个系数)
        :param increase_factor: 增长系数(成功时乘以这个系数)
        :param success_threshold: 连续成功多少次后增加速率
        :param window_size: 滑动窗口大小
        """
        self.rate = initial_rate
        self.min_rate = min_rate
        self.max_rate = max_rate
        self.backoff_factor = backoff_factor
        self.increase_factor = increase_factor
        self.success_threshold = success_threshold
        self.window_size = window_size
        
        self.token_bucket = TokenBucket(capacity=10, fill_rate=self.rate)
        self.success_count = 0
        self.request_history = []
        self.lock = threading.Lock()

    def on_success(self):
        """请求成功时调用"""
        with self.lock:
            self.success_count += 1
            self.request_history.append(('success', time.time()))
            
            # 清理过期的历史记录
            cutoff = time.time() - 60  # 只保留最近1分钟的记录
            self.request_history = [r for r in self.request_history if r[1] > cutoff]
            
            # 如果连续成功次数达到阈值,增加速率
            if self.success_count >= self.success_threshold:
                self.rate = min(self.max_rate, self.rate * self.increase_factor)
                self.token_bucket.fill_rate = self.rate
                self.success_count = 0
                print(f"速率增加到: {self.rate:.2f} 请求/秒")

    def on_error(self, status_code):
        """请求出错时调用"""
        with self.lock:
            self.success_count = 0
            self.request_history.append(('error', time.time(), status_code))
            
            # 如果是429或503,降低速率
            if status_code in (429, 503):
                self.rate = max(self.min_rate, self.rate * self.backoff_factor)
                self.token_bucket.fill_rate = self.rate
                print(f"遇到{status_code},速率降低到: {self.rate:.2f} 请求/秒")
                
                # 额外等待一段时间
                time.sleep(5)

    def get_token(self):
        """获取令牌"""
        self.token_bucket.get_token()

使用方式:

limiter = AdaptiveRateLimiter(initial_rate=2, max_rate=5)

for url in urls:
    limiter.get_token()
    
    try:
        response = requests.get(url, timeout=10)
        response.raise_for_status()
        limiter.on_success()
        process_response(response)
    except requests.exceptions.HTTPError as e:
        limiter.on_error(e.response.status_code)
    except Exception as e:
        print(f"请求出错: {e}")

这个自适应速率限制器,会自动找到服务器能承受的最大速率。既不会太慢,也不会被封。

我用这个爬过某点评站点,一开始速率是2请求/秒,后来自动涨到了4.5请求/秒,连续跑了两周,非常稳定。

终极方案:结合代理池的分布式调度

如果你的数据量特别大,比如要爬几千万甚至上亿条数据。单IP再怎么控制频率,速度还是有限的。

这时候,你就需要结合代理池了。

但注意,代理池不是用来逃避频率控制的。而是用来横向扩展的。

很多人用代理池的方式是错的:每个请求换一个代理。这样反而更容易被封。因为同一个IP在短时间内只发一个请求,这本身就是异常行为。

正确的做法是:

  1. 每个代理IP,都有自己独立的速率限制器
  2. 同一个IP,在一段时间内连续发送请求
  3. 当一个IP被封了,自动切换到下一个IP
  4. 被封的IP,过一段时间后自动解封重试

任务队列

调度器

代理池

代理1

代理2

代理3

速率限制器1

速率限制器2

速率限制器3

发送请求

是否被封?

将代理标记为不可用

等待一段时间后自动重试

处理响应

我给你们一个简单的实现思路:

class Proxy:
    def __init__(self, ip, port):
        self.ip = ip
        self.port = port
        self.limiter = AdaptiveRateLimiter(initial_rate=1, max_rate=3)
        self.is_available = True
        self.last_used_time = 0
        self.ban_count = 0

    def get_proxy(self):
        return {
            "http": f"http://{self.ip}:{self.port}",
            "https": f"http://{self.ip}:{self.port}"
        }

    def mark_as_banned(self):
        self.is_available = False
        self.ban_count += 1
        self.last_ban_time = time.time()

    def try_unban(self):
        # 被封后,等待30分钟自动重试
        if not self.is_available and time.time() - self.last_ban_time > 1800:
            self.is_available = True
            # 解封后降低速率
            self.limiter.rate = max(0.5, self.limiter.rate * 0.5)

class ProxyPool:
    def __init__(self, proxies):
        self.proxies = [Proxy(ip, port) for ip, port in proxies]
        self.lock = threading.Lock()

    def get_available_proxy(self):
        """获取一个可用的代理"""
        with self.lock:
            # 先尝试解封
            for proxy in self.proxies:
                proxy.try_unban()
            
            # 选择可用的代理
            available_proxies = [p for p in self.proxies if p.is_available]
            if not available_proxies:
                raise Exception("没有可用的代理")
            
            # 选择最近最少使用的代理
            available_proxies.sort(key=lambda x: x.last_used_time)
            proxy = available_proxies[0]
            proxy.last_used_time = time.time()
            return proxy

然后在爬虫中使用:

proxy_pool = ProxyPool([
    ("1.1.1.1", 8080),
    ("2.2.2.2", 8080),
    ("3.3.3.3", 8080),
])

def crawl_url(url):
    while True:
        proxy = proxy_pool.get_available_proxy()
        proxy.limiter.get_token()
        
        try:
            response = requests.get(url, proxies=proxy.get_proxy(), timeout=10)
            
            if response.status_code == 429 or response.status_code == 403:
                proxy.mark_as_banned()
                print(f"代理 {proxy.ip} 被封")
                continue
                
            response.raise_for_status()
            proxy.limiter.on_success()
            return response.text
            
        except Exception as e:
            proxy.limiter.on_error(500)
            print(f"请求出错: {e}")
            continue

这个方案,我用100个代理,爬过某电商站点的1000万条商品数据,用了大概10天时间,全程没有人工干预。

一些容易被忽略的细节

光有频率控制还不够。还有一些细节,也会影响你会不会被封。

请求头的完整性

很多人只设置User-Agent,其他头都不设置。这也是异常行为。

正常的浏览器,会发送很多请求头,比如Accept、Accept-Language、Referer、Cookie等等。

你应该尽量模拟完整的请求头。最好的办法是,从浏览器中复制完整的请求头,然后在爬虫中使用。

Cookie的管理

不要每个请求都换Cookie。正常用户的Cookie是会保持一段时间的。

你可以给每个代理IP分配一个独立的Cookie池,同一个IP在一段时间内使用同一个Cookie。

请求路径的随机性

不要按顺序爬取所有页面。比如,不要从page=1爬到page=10000。

你可以把所有URL打乱顺序,随机爬取。这样更像人的行为。

并发数的控制

不要开太多线程。一般来说,单IP的并发数不要超过5。

并发数太高,即使平均速率不高,也会因为突发流量被封。

超时和重试

一定要设置合理的超时时间。一般10-30秒比较合适。

对于超时的请求,不要立即重试。应该等待一段时间,然后用不同的代理重试。

写在最后

爬虫这东西,本质上是和网站的攻防博弈。

没有永远有效的反爬,也没有永远有效的反反爬。

但有一点是不变的:越像人,越不容易被封

很多人追求极致的速度,结果被封了IP,反而浪费了更多的时间。

慢就是快。

只要你的爬虫能稳定运行,哪怕每天只爬10万条数据,一个月也能爬300万。这比跑一天就被封,然后花一周时间找代理、换IP,效率高多了。

希望这篇文章能帮到你。

Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐