爬崩3个站点后总结的反爬终极方案:百万数据不封IP的频率控制艺术
本文分享了爬虫频率控制的实战经验,从基础随机休眠到工业级解决方案。首先指出新手常见误区:过度依赖代理而非行为模拟。核心观点是反爬系统主要通过请求行为异常识别爬虫,而非简单的User-Agent检测。 文章提出三个层级解决方案: 基础层:用正态分布替代均匀随机休眠(numpy.normal实现),更真实模拟人类浏览间隔 工业层:令牌桶算法实现精确流量控制,支持突发请求且保证平均速率 进阶层:动态自适

干爬虫这行的,谁没被封过几个IP?
我见过太多新手,上来就开多线程猛冲,结果跑了不到5分钟,IP直接进黑名单。然后就开始骂站点反爬恶心,到处找免费代理。
免费代理?呵呵。能用的没几个,能用的也慢得要死,还动不动就断。你以为你在薅羊毛,其实人家在薅你。
真正的高手,从来不是靠代理堆出来的。而是靠精准的请求频率控制,让服务器觉得你就是个正常用户。
今天这篇,我把这两年爬了不下10亿条数据总结出来的频率控制经验,全部分享给你。从最基础的随机休眠,到工业级的令牌桶算法,再到动态自适应调度。看完你会发现,原来不封IP这么简单。
为什么你的爬虫总是被封?
先搞清楚一个最基本的问题:服务器是怎么识别爬虫的?
很多人第一反应是User-Agent。错了。User-Agent只是最基础的反爬手段,现在随便一个爬虫都会换UA。
真正让你被封的,是请求行为异常。
一个正常用户,浏览网页的速度是多少?
- 打开一个页面,至少会看个3-5秒
- 点击链接之间,会有思考时间
- 不会连续不断地请求同一个域名
- 一天之内的请求量是有限的
而你的爬虫呢?
- 每秒发10个请求
- 每个请求间隔精确到毫秒
- 24小时不间断运行
- 请求路径完全固定
这种行为,在服务器的反爬系统眼里,就像黑夜里的手电筒一样显眼。不封你封谁?
所以,反爬的核心,不是伪装你的身份,而是模仿人的行为。
而模仿人的行为,最关键的就是控制请求的时间间隔和频率。
别再只会用time.sleep(random.randint(1,3))了
我知道,很多人写爬虫,频率控制就是这么一行代码:
import time
import random
time.sleep(random.randint(1, 3))
有用吗?有点用。对付那些反爬特别弱的站点,足够了。
但稍微有点反爬能力的站点,一眼就能看穿。
为什么?因为这种随机是均匀分布的。
正常用户的浏览间隔,不是均匀分布的。而是符合正态分布的。
什么意思?
- 大部分时候,间隔在2-5秒之间
- 偶尔会有1秒以内的快速点击
- 偶尔会有10秒以上的长时间停留
而random.randint(1,3)生成的间隔,每个值出现的概率是一样的。这根本就不是人的行为。
给你们看个对比图:
那怎么实现正态分布的随机休眠?
用numpy的normal函数。
import numpy as np
def random_sleep(mean=3, std=1, min_sleep=0.5, max_sleep=10):
"""
正态分布随机休眠
:param mean: 平均休眠时间(秒)
:param std: 标准差
:param min_sleep: 最小休眠时间
:param max_sleep: 最大休眠时间
"""
sleep_time = np.random.normal(mean, std)
sleep_time = max(min_sleep, min(sleep_time, max_sleep))
time.sleep(sleep_time)
这个函数生成的休眠时间,大部分会集中在mean附近,偶尔会有偏离。完美模拟了人类的浏览行为。
我用这个函数,爬过很多反爬中等的站点,单IP连续跑一周都没被封过。
但这还不够。
对于那些反爬特别严格的站点,比如某电商、某点评,光有随机休眠还是不够的。
因为随机休眠只能控制单个请求之间的间隔,无法控制单位时间内的总请求数。
比如你开了10个线程,每个线程间隔3秒,那每秒还是会有3个多请求。对于很多站点来说,这个频率还是太高了。
这时候,你需要更高级的频率控制算法。
工业级频率控制:令牌桶算法
令牌桶算法,是目前工业界最常用的流量控制算法。没有之一。
它的原理很简单:
- 系统以固定的速率向桶中放入令牌
- 每个请求需要获取一个令牌才能发送
- 如果桶中没有令牌,请求就会等待
- 桶有最大容量,超过容量的令牌会被丢弃
这个算法的好处是:
- 可以精确控制平均请求速率
- 允许一定程度的突发流量(桶的容量)
- 实现简单,性能极高
用Python实现一个令牌桶也很简单:
import time
import threading
class TokenBucket:
def __init__(self, capacity, fill_rate):
"""
令牌桶初始化
:param capacity: 桶的最大容量(令牌数)
:param fill_rate: 令牌填充速率(令牌/秒)
"""
self.capacity = capacity
self.fill_rate = fill_rate
self.tokens = capacity
self.last_refill_time = time.time()
self.lock = threading.Lock()
def consume(self, tokens=1):
"""
消耗令牌
:param tokens: 需要消耗的令牌数
:return: 等待时间(秒),如果不需要等待则返回0
"""
with self.lock:
now = time.time()
# 计算需要补充的令牌数
delta = (now - self.last_refill_time) * self.fill_rate
self.tokens = min(self.capacity, self.tokens + delta)
self.last_refill_time = now
if self.tokens >= tokens:
self.tokens -= tokens
return 0
else:
# 计算需要等待的时间
wait_time = (tokens - self.tokens) / self.fill_rate
self.tokens = 0
self.last_refill_time = now + wait_time
return wait_time
def get_token(self, tokens=1):
"""
获取令牌,如果没有则等待
"""
wait_time = self.consume(tokens)
if wait_time > 0:
time.sleep(wait_time)
怎么用?
# 创建一个令牌桶,容量10,每秒填充2个令牌
# 意味着平均每秒2个请求,最大突发10个请求
bucket = TokenBucket(capacity=10, fill_rate=2)
for url in urls:
# 获取令牌
bucket.get_token()
# 发送请求
response = requests.get(url)
# 处理响应
process_response(response)
就这么简单。
这个令牌桶,比你写100行随机休眠代码都管用。
我用这个实现,单IP爬某电商站点,每秒2个请求,连续跑了一个月,爬了500多万条数据,IP一次都没被封过。
进阶:动态自适应频率控制
令牌桶虽然好用,但它有个问题:速率是固定的。
不同的站点,反爬强度不一样。
同一个站点,不同的时间段,反爬强度也不一样。
同一个站点,不同的页面,反爬强度还不一样。
比如,某站点的列表页,每秒允许5个请求。但详情页,每秒只允许1个请求。
又比如,白天访问量大,反爬严格。晚上访问量小,反爬宽松。
如果你用固定的速率,要么爬得太慢,要么容易被封。
这时候,你需要动态自适应频率控制。
什么意思?就是让爬虫自己根据服务器的响应,自动调整请求速率。
核心思路:
- 监控服务器的响应状态码和响应时间
- 如果出现429(请求过多)或503(服务不可用),说明速率太快,降低速率
- 如果连续一段时间没有出现错误,说明速率可以提高
- 维护一个滑动窗口,记录最近的请求结果
实现起来也不难:
class AdaptiveRateLimiter:
def __init__(self, initial_rate=2, min_rate=0.1, max_rate=10,
backoff_factor=0.5, increase_factor=1.1,
success_threshold=100, window_size=1000):
"""
自适应速率限制器
:param initial_rate: 初始速率(请求/秒)
:param min_rate: 最小速率
:param max_rate: 最大速率
:param backoff_factor: 退避系数(出错时乘以这个系数)
:param increase_factor: 增长系数(成功时乘以这个系数)
:param success_threshold: 连续成功多少次后增加速率
:param window_size: 滑动窗口大小
"""
self.rate = initial_rate
self.min_rate = min_rate
self.max_rate = max_rate
self.backoff_factor = backoff_factor
self.increase_factor = increase_factor
self.success_threshold = success_threshold
self.window_size = window_size
self.token_bucket = TokenBucket(capacity=10, fill_rate=self.rate)
self.success_count = 0
self.request_history = []
self.lock = threading.Lock()
def on_success(self):
"""请求成功时调用"""
with self.lock:
self.success_count += 1
self.request_history.append(('success', time.time()))
# 清理过期的历史记录
cutoff = time.time() - 60 # 只保留最近1分钟的记录
self.request_history = [r for r in self.request_history if r[1] > cutoff]
# 如果连续成功次数达到阈值,增加速率
if self.success_count >= self.success_threshold:
self.rate = min(self.max_rate, self.rate * self.increase_factor)
self.token_bucket.fill_rate = self.rate
self.success_count = 0
print(f"速率增加到: {self.rate:.2f} 请求/秒")
def on_error(self, status_code):
"""请求出错时调用"""
with self.lock:
self.success_count = 0
self.request_history.append(('error', time.time(), status_code))
# 如果是429或503,降低速率
if status_code in (429, 503):
self.rate = max(self.min_rate, self.rate * self.backoff_factor)
self.token_bucket.fill_rate = self.rate
print(f"遇到{status_code},速率降低到: {self.rate:.2f} 请求/秒")
# 额外等待一段时间
time.sleep(5)
def get_token(self):
"""获取令牌"""
self.token_bucket.get_token()
使用方式:
limiter = AdaptiveRateLimiter(initial_rate=2, max_rate=5)
for url in urls:
limiter.get_token()
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
limiter.on_success()
process_response(response)
except requests.exceptions.HTTPError as e:
limiter.on_error(e.response.status_code)
except Exception as e:
print(f"请求出错: {e}")
这个自适应速率限制器,会自动找到服务器能承受的最大速率。既不会太慢,也不会被封。
我用这个爬过某点评站点,一开始速率是2请求/秒,后来自动涨到了4.5请求/秒,连续跑了两周,非常稳定。
终极方案:结合代理池的分布式调度
如果你的数据量特别大,比如要爬几千万甚至上亿条数据。单IP再怎么控制频率,速度还是有限的。
这时候,你就需要结合代理池了。
但注意,代理池不是用来逃避频率控制的。而是用来横向扩展的。
很多人用代理池的方式是错的:每个请求换一个代理。这样反而更容易被封。因为同一个IP在短时间内只发一个请求,这本身就是异常行为。
正确的做法是:
- 每个代理IP,都有自己独立的速率限制器
- 同一个IP,在一段时间内连续发送请求
- 当一个IP被封了,自动切换到下一个IP
- 被封的IP,过一段时间后自动解封重试
我给你们一个简单的实现思路:
class Proxy:
def __init__(self, ip, port):
self.ip = ip
self.port = port
self.limiter = AdaptiveRateLimiter(initial_rate=1, max_rate=3)
self.is_available = True
self.last_used_time = 0
self.ban_count = 0
def get_proxy(self):
return {
"http": f"http://{self.ip}:{self.port}",
"https": f"http://{self.ip}:{self.port}"
}
def mark_as_banned(self):
self.is_available = False
self.ban_count += 1
self.last_ban_time = time.time()
def try_unban(self):
# 被封后,等待30分钟自动重试
if not self.is_available and time.time() - self.last_ban_time > 1800:
self.is_available = True
# 解封后降低速率
self.limiter.rate = max(0.5, self.limiter.rate * 0.5)
class ProxyPool:
def __init__(self, proxies):
self.proxies = [Proxy(ip, port) for ip, port in proxies]
self.lock = threading.Lock()
def get_available_proxy(self):
"""获取一个可用的代理"""
with self.lock:
# 先尝试解封
for proxy in self.proxies:
proxy.try_unban()
# 选择可用的代理
available_proxies = [p for p in self.proxies if p.is_available]
if not available_proxies:
raise Exception("没有可用的代理")
# 选择最近最少使用的代理
available_proxies.sort(key=lambda x: x.last_used_time)
proxy = available_proxies[0]
proxy.last_used_time = time.time()
return proxy
然后在爬虫中使用:
proxy_pool = ProxyPool([
("1.1.1.1", 8080),
("2.2.2.2", 8080),
("3.3.3.3", 8080),
])
def crawl_url(url):
while True:
proxy = proxy_pool.get_available_proxy()
proxy.limiter.get_token()
try:
response = requests.get(url, proxies=proxy.get_proxy(), timeout=10)
if response.status_code == 429 or response.status_code == 403:
proxy.mark_as_banned()
print(f"代理 {proxy.ip} 被封")
continue
response.raise_for_status()
proxy.limiter.on_success()
return response.text
except Exception as e:
proxy.limiter.on_error(500)
print(f"请求出错: {e}")
continue
这个方案,我用100个代理,爬过某电商站点的1000万条商品数据,用了大概10天时间,全程没有人工干预。
一些容易被忽略的细节
光有频率控制还不够。还有一些细节,也会影响你会不会被封。
请求头的完整性
很多人只设置User-Agent,其他头都不设置。这也是异常行为。
正常的浏览器,会发送很多请求头,比如Accept、Accept-Language、Referer、Cookie等等。
你应该尽量模拟完整的请求头。最好的办法是,从浏览器中复制完整的请求头,然后在爬虫中使用。
Cookie的管理
不要每个请求都换Cookie。正常用户的Cookie是会保持一段时间的。
你可以给每个代理IP分配一个独立的Cookie池,同一个IP在一段时间内使用同一个Cookie。
请求路径的随机性
不要按顺序爬取所有页面。比如,不要从page=1爬到page=10000。
你可以把所有URL打乱顺序,随机爬取。这样更像人的行为。
并发数的控制
不要开太多线程。一般来说,单IP的并发数不要超过5。
并发数太高,即使平均速率不高,也会因为突发流量被封。
超时和重试
一定要设置合理的超时时间。一般10-30秒比较合适。
对于超时的请求,不要立即重试。应该等待一段时间,然后用不同的代理重试。
写在最后
爬虫这东西,本质上是和网站的攻防博弈。
没有永远有效的反爬,也没有永远有效的反反爬。
但有一点是不变的:越像人,越不容易被封。
很多人追求极致的速度,结果被封了IP,反而浪费了更多的时间。
慢就是快。
只要你的爬虫能稳定运行,哪怕每天只爬10万条数据,一个月也能爬300万。这比跑一天就被封,然后花一周时间找代理、换IP,效率高多了。
希望这篇文章能帮到你。
openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构
更多推荐


所有评论(0)