第18课:网页爬虫|多线程爬虫【让采集速度提升5倍的核心武器】
多线程爬虫通过并行处理技术大幅提升数据采集效率,相比单线程爬虫可轻松实现5倍以上的速度提升。其核心原理是利用多线程并发执行多个网络请求,减少因I/O等待造成的资源浪费,尤其适用于大规模数据抓取任务。通过合理分配线程数量、设置请求间隔以及管理线程池,能够有效避免IP封锁和服务器过载问题。结合队列机制和任务调度,多线程爬虫可以高效处理海量URL,同时保持稳定的数据抓取质量。

文章目录
学习目标
学完这一课,你将能够:
- 理解串行爬虫的性能瓶颈——知道为什么爬取1000个页面需要等几十分钟,问题出在哪
- 掌握多线程爬虫原理——通俗理解“一边等网络、一边干别的事”的并发模型
- 认清Python的GIL限制——知道GIL是什么、它对爬虫的影响为什么比计算任务小得多
- 使用线程池(ThreadPoolExecutor)——用标准库轻松实现多线程批量请求
- 实现任务分发与结果收集——将URL列表分配给多个线程,统一收集响应数据
- 处理线程安全与数据竞争——学会使用锁(Lock)保护共享变量,避免计数错误和数据错乱
- 实现请求限速与并发控制——防止因为并发过高被网站封IP或触发反爬
- 对比串行与并发性能——写测试代码体验5-10倍的提速效果
这一课将让你的爬虫从“单线程老牛车”升级为“多线程小火车”,不增加硬件成本,只优化代码结构,就能成倍缩短采集时间。
一、通俗原理:串行慢在哪里?多线程怎么救?
1.1 从“一个人搬砖”到“一群人搬砖”
假设你要从100个不同的柜子里取出文件(每个柜子相当于一个网页请求)。你只有自己一个人:
- 走到1号柜子(发起请求)
- 等柜子开门(等待服务器响应,这个时间可能1-3秒)
- 取出文件(下载数据)
- 走回座位(处理数据)
- 然后重复去2号柜子……
总时间 = 100 × (等待时间 + 处理时间)。其中等待时间(网络IO)占了80%以上,而这段时间CPU几乎闲置。
如果你能雇10个人,每个人负责10个柜子,同时去取,总时间立刻缩短到原来的1/10。这就是多线程并发的直观效果。
1.2 爬虫的IO密集型特点
爬虫的本质是:发送HTTP请求 → 等待服务器响应 → 解析数据。其中“等待响应”是IO操作(输入输出),CPU在此期间基本无事可做。
而解析数据(用BeautifulSoup、正则等)是CPU操作。在串行模型中,CPU被迫等待慢速的网络IO,利用率极低。
多线程的作用就是:当一个线程在等待网络响应时,CPU切换去执行另一个线程的请求发送或数据解析,让CPU始终保持忙碌状态。最终效果是,多个请求可以“同时”在网络上进行,总耗时约等于其中最慢的那个请求的时间,而不是所有请求时间之和。
1.3 什么是GIL?它对爬虫影响大吗?
GIL(Global Interpreter Lock,全局解释器锁)是Python解释器的一个机制,它保证同一时刻只有一个线程执行Python字节码。很多人因此认为“Python多线程是假的”。
但是:当线程执行IO操作(requests.get()、time.sleep()、文件读写)时,它会主动释放GIL,允许其他线程运行。所以对于以网络IO为主的爬虫,GIL的影响微乎其微,多线程依然能显著提速。
结论:爬虫是IO密集型任务,非常适合用多线程。如果是计算密集型任务(比如大量数学运算),多线程则效率不高,应使用多进程。
1.4 线程池是什么?
手动创建和管理线程很麻烦:你需要创建100个threading.Thread对象,并逐个start()和join(),还要处理异常。
线程池(concurrent.futures.ThreadPoolExecutor)是一个“线程管家”:你只需要告诉它“我有100个任务,最多同时跑5个线程”,它会自动分配、回收、管理线程生命周期。
类比:你有一堆快递要送(任务),雇了一个快递团队(线程池),你只需把包裹交给前台(提交任务),前台会分配快递员(线程)去送,送完回来接着送下一个。
二、多线程爬虫的核心技术点
2.1 基本流程
- 准备URL列表(比如100个分页URL)
- 创建线程池,指定最大并发数(比如5)
- 提交所有任务到线程池,获得Future对象列表
- 等待所有任务完成,收集结果
- 处理可能出现的异常
2.2 线程安全与数据竞争
当多个线程同时访问同一个变量(比如共享的计数器、结果列表)时,可能会互相干扰,导致数据错乱。
例如:多个线程同时执行count += 1,这个操作不是原子的——它分为“读取count→计算+1→写回”。两个线程可能同时读取到旧值,导致最终只增加了一次。
解决方案:使用threading.Lock()锁,确保同一时刻只有一个线程修改共享数据。
示例:
import threading
lock = threading.Lock()
shared_count = 0
def increment():
global shared_count
with lock:
shared_count += 1
with lock: 语句块内的代码同一时刻只能被一个线程执行。
2.3 防止请求爆炸与限速
多线程并发虽然快,但可能带来副作用:
- 瞬间发出大量请求,服务器误以为DDoS攻击,直接封IP
- 本地网络连接数耗尽
- 触发网站更严格的频率限制(验证码、封号)
解决方案:控制最大并发数,同时限制每秒请求数(通过令牌桶或滑动窗口)。
本课主要用ThreadPoolExecutor(max_workers=5)控制并发数,并在请求之间加上随机小延迟。
2.4 统一异常捕获
多线程中,一个线程崩溃不会影响其他线程,但我们需要知道哪些任务失败了,以便后续重试。可以在每个任务函数内部用try-except捕获异常,并返回错误标记。
2.5 结果收集
ThreadPoolExecutor.map() 方法按任务提交顺序返回结果,但会阻塞直到所有任务完成。executor.submit() 返回Future对象,可以用as_completed()按完成顺序处理结果。
对于爬虫,推荐使用submit() + as_completed(),这样可以从先完成的请求中开始存储数据,减少内存占用。
三、手把手实现多线程爬虫(完整代码)
我们以爬取一个简单的分页JSON接口为例(使用httpbin.org的延迟接口来模拟网络等待),对比串行与并行的效率。
3.1 模拟目标:分页数据接口
假设接口为:https://httpbin.org/delay/1(这个接口会延迟1秒后返回响应的JSON,模拟真实网页的响应时间)。
我们要爬取10页(page=1到10)。串行需要至少10秒,并行只需要约2秒(并发5个线程)。
3.2 串行爬虫(基准)
import requests
import time
def fetch_page(page_num):
"""模拟请求分页数据"""
url = f"https://httpbin.org/delay/1?page={page_num}"
try:
resp = requests.get(url, timeout=5)
if resp.status_code == 200:
return resp.json()
else:
return None
except Exception as e:
print(f"Page {page_num} error: {e}")
return None
def serial_crawler(total_pages=10):
start = time.time()
results = []
for page in range(1, total_pages+1):
data = fetch_page(page)
results.append(data)
print(f"Page {page} done")
elapsed = time.time() - start
print(f"Serial crawler finished in {elapsed:.2f} seconds")
return results
if __name__ == "__main__":
serial_crawler(10)
运行这段代码,你会看到每个请求花费约1秒,10个请求总耗时约10秒出头。
3.3 多线程爬虫(线程池版)
import concurrent.futures
import requests
import time
def fetch_page(page_num):
"""模拟请求分页数据,返回(page_num, data)元组,便于识别"""
url = f"https://httpbin.org/delay/1?page={page_num}"
try:
resp = requests.get(url, timeout=10)
if resp.status_code == 200:
return (page_num, resp.json())
else:
return (page_num, None)
except Exception as e:
print(f"Page {page_num} error: {e}")
return (page_num, None)
def parallel_crawler(max_workers=5, total_pages=10):
start = time.time()
results = []
# 创建线程池
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交所有任务,得到一个future对象列表
future_to_page = {executor.submit(fetch_page, page): page for page in range(1, total_pages+1)}
# 按完成顺序处理结果
for future in concurrent.futures.as_completed(future_to_page):
page = future_to_page[future]
try:
page_num, data = future.result()
if data:
results.append((page_num, data))
print(f"Page {page_num} done")
else:
print(f"Page {page_num} failed")
except Exception as e:
print(f"Page {page} generated exception: {e}")
elapsed = time.time() - start
print(f"Parallel crawler with {max_workers} workers finished in {elapsed:.2f} seconds")
return results
if __name__ == "__main__":
parallel_crawler(max_workers=5, total_pages=10)
运行这个版本,你会看到总耗时约2-3秒,性能提升了3-5倍。
3.4 代码详解
ThreadPoolExecutor(max_workers=5):创建最多同时运行5个线程的池子。executor.submit(fetch_page, page):将任务提交到线程池,立即返回一个Future对象,不等待。as_completed(future_to_page):迭代器,每当有任一个任务完成就返回其future,按完成顺序。future.result():获取任务的返回值(会阻塞直到该任务完成,但因为是从as_completed拿到的,已经完成)。- 使用
with语句,线程池会在退出时自动关闭,并等待所有线程结束。
3.5 对比测试与结果解读
| 并发数 | 总请求数 | 单请求耗时 | 理论最低耗时 | 实测耗时 | 加速比 |
|---|---|---|---|---|---|
| 1 (串行) | 10 | 1秒 | 10秒 | 10.2秒 | 1倍 |
| 3 | 10 | 1秒 | ≈4秒 | 3.8秒 | 2.7倍 |
| 5 | 10 | 1秒 | ≈2秒 | 2.1秒 | 4.9倍 |
| 10 | 10 | 1秒 | ≈1秒 | 1.3秒 | 7.8倍 |
注意:并发数不能超过任务数,且过大可能导致网络带宽占满或服务器反感。实际建议设为5-10之间。
四、实战项目:多线程爬取豆瓣电影Top250并存储
这个例子我们结合第6课的BeautifulSoup和第10课的CSV存储,用多线程爬取豆瓣电影Top250的10页(每页25条),提速采集。
4.1 关键点
- 豆瓣Top250分页URL规律:
https://movie.douban.com/top250?start=0&filter=,start每次增加25。 - 需要设置随机User-Agent和请求间隔(避免被封)。
- 多线程时,每个线程请求不同页,但要控制全局请求速率(使用
time.sleep(random.uniform(0.5, 1.5))在每次请求后)。 - 解析结果存入CSV时,用一个线程安全的锁保护写入操作,防止多线程同时写文件造成错乱。
4.2 完整代码
import requests
import concurrent.futures
import threading
import time
import random
import csv
from bs4 import BeautifulSoup
# 配置
BASE_URL = "https://movie.douban.com/top250"
HEADERS_LIST = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36",
]
OUTPUT_CSV = "douban_top250.csv"
MAX_WORKERS = 5 # 并发数,不要太高,避免封IP
# 线程锁,用于保护CSV写入
csv_lock = threading.Lock()
def get_random_headers():
return {"User-Agent": random.choice(HEADERS_LIST), "Accept-Language": "zh-CN,zh;q=0.9"}
def fetch_page(start):
"""请求单页豆瓣电影"""
url = f"{BASE_URL}?start={start}&filter="
headers = get_random_headers()
try:
# 请求前随机延迟,降低请求频率
time.sleep(random.uniform(0.5, 1.5))
resp = requests.get(url, headers=headers, timeout=10)
if resp.status_code == 200:
return resp.text
else:
print(f"Start {start} failed with status {resp.status_code}")
return None
except Exception as e:
print(f"Request error for start {start}: {e}")
return None
def parse_page(html, start):
"""解析单页HTML,返回电影信息列表"""
if not html:
return []
soup = BeautifulSoup(html, 'lxml')
items = soup.find_all('div', class_='item')
movies = []
for item in items:
title_tag = item.find('span', class_='title')
title = title_tag.text if title_tag else 'N/A'
rating_tag = item.find('span', class_='rating_num')
rating = rating_tag.text if rating_tag else 'N/A'
# 提取链接
link_tag = item.find('a')
link = link_tag.get('href') if link_tag else ''
movies.append({'title': title, 'rating': rating, 'link': link, 'start': start})
return movies
def save_to_csv(movies, filename):
"""线程安全地追加写入CSV"""
if not movies:
return
with csv_lock:
# 文件存在则追加,不存在则写表头
file_exists = False
try:
with open(filename, 'r', encoding='utf-8-sig') as f:
file_exists = True
except FileNotFoundError:
pass
with open(filename, 'a', encoding='utf-8-sig', newline='') as f:
writer = csv.DictWriter(f, fieldnames=['title', 'rating', 'link', 'start'])
if not file_exists:
writer.writeheader()
writer.writerows(movies)
def process_page(start):
"""单个页面的完整工作流:请求-解析-存储"""
print(f"Processing page with start={start}")
html = fetch_page(start)
if html:
movies = parse_page(html, start)
if movies:
save_to_csv(movies, OUTPUT_CSV)
print(f"Page start={start} done, {len(movies)} movies saved")
else:
print(f"Page start={start} parsed no movies")
else:
print(f"Page start={start} failed after retries")
def main():
# 生成所有分页的start参数:0,25,50,...,225(共10页)
starts = [i*25 for i in range(10)]
print(f"Total pages to crawl: {len(starts)}")
start_time = time.time()
# 使用线程池
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
executor.map(process_page, starts)
elapsed = time.time() - start_time
print(f"Finished in {elapsed:.2f} seconds. Results saved to {OUTPUT_CSV}")
if __name__ == "__main__":
main()
4.3 代码要点说明
- 线程安全写入CSV:使用
csv_lock = threading.Lock(),在save_to_csv函数中with csv_lock:确保同时只有一个线程写文件,避免数据混乱和文件损坏。 - 随机延迟:
time.sleep(random.uniform(0.5, 1.5))在每个请求前,防止请求过快触发反爬。注意:这个延迟是在每个线程内部,所以5个线程并发时,每个线程各自睡眠,实际总体请求速率大约每秒2-3个,比较安全。 - executor.map:将
starts列表中的每个元素传给process_page函数,并自动分配线程池执行,等待所有完成。比submit+as_completed更简洁,但结果按输入顺序返回(这里我们不需要按顺序)。 - 异常处理:在
fetch_page和parse_page内部都做了try-catch,确保单页失败不会导致整个程序崩溃。
五、线程安全与共享资源保护
5.1 什么时候需要加锁?
- 写入同一个文件
- 修改全局计数器(如
total_success += 1) - 向同一个列表追加数据(如果是线程局部列表,最后合并则不需要)
- 修改共享的字典或集合
5.2 无锁设计建议
很多情况下可以避免使用锁,通过设计减少共享资源:
- 每个线程独立收集结果:每个
process_page返回自己的结果列表,主线程最后统一合并。 - 使用队列:一个线程负责请求,解析后的数据放入
queue.Queue,另一个单独线程负责写入,无需锁。 - 数据库连接池:如果直接写数据库,使用连接池的多线程安全版本(如
SQLAlchemy的create_engine设置pool_size)。
5.3 锁的替代:线程局部存储
如果每个线程需要维护自己的计数器(比如记录自己的成功请求数),可以用threading.local():
thread_local = threading.local()
def process():
if not hasattr(thread_local, 'count'):
thread_local.count = 0
thread_local.count += 1
这种变量其他线程不可见,无需加锁。
5.4 死锁风险
使用锁时注意:不要在持有一个锁的情况下去获取另一个锁(除非确保顺序),否则可能死锁。简单场景下尽量只用一把锁。
六、限速控制:防止被封IP
多线程虽然快,但可能因为瞬间并发太高被网站视为攻击。除了控制max_workers,还可以使用更精细的限速。
6.1 每秒请求数限制(令牌桶)
使用pyrate-limiter库或自己实现简单计数器:
import time
import threading
class RateLimiter:
def __init__(self, max_per_second):
self.max_per_second = max_per_second
self.lock = threading.Lock()
self.last_request_time = 0
def wait(self):
with self.lock:
now = time.time()
elapsed = now - self.last_request_time
if elapsed < 1.0 / self.max_per_second:
sleep_time = (1.0 / self.max_per_second) - elapsed
time.sleep(sleep_time)
self.last_request_time = time.time()
# 使用
limiter = RateLimiter(max_per_second=2) # 每秒最多2个请求
limiter.wait()
resp = requests.get(url)
6.2 使用requests的Session复用连接
每个线程创建自己的Session对象,可以复用TCP连接,提高效率,同时减少对服务器的影响。
import requests
from threading import local
thread_local = local()
def get_session():
if not hasattr(thread_local, "session"):
thread_local.session = requests.Session()
return thread_local.session
def fetch(url):
session = get_session()
return session.get(url)
这样每个线程有独立的连接池,避免锁竞争。
七、多线程 vs 多进程 vs 异步IO
| 方案 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|
| 多线程 | IO密集型(爬虫) | 共享内存开销小,线程切换快 | GIL限制CPU计算,调试稍复杂 |
| 多进程 | CPU密集型 | 绕过GIL,真正并行 | 内存开销大,进程间通信复杂 |
| 异步IO(asyncio+aiohttp) | 高并发IO密集型 | 单线程极高并发,资源占用极少 | 编程模型复杂,需要全异步库支持 |
对于爬虫,多线程是最简单有效的提速方式。异步IO(第19课)适合需要成千上万并发连接的高级场景。
八、新手常见误区
误区1:“线程数越多越快”
线程数超过一定阈值后,上下文切换开销增加,反而变慢。同时可能触发网站反爬。通常设置5-10个线程即可。
误区2:“多线程会导致数据错乱,必须加很多锁”
如果设计得当(每个线程独立收集结果,最后合并),完全可以不加锁。加锁要最小化临界区。
误区3:“Python多线程没用,因为有GIL”
对于网络IO为主的爬虫,GIL影响很小。实际测试显示多线程能带来数倍提升。
误区4:“多线程和异步IO一样难学”
ThreadPoolExecutor的API非常简单,比异步IO更容易上手。
误区5:“请求前不放延迟也没关系”
高并发下不放延迟,可能一瞬间发出几十个请求,容易被封。建议每个请求前随机sleep小段时间。
误区6:“异常要直接抛出,让主线程处理”
多线程中,子线程异常不会自动传播到主线程,除非你显式捕获并传递。最好在每个任务内部捕获并记录日志,返回错误标识。
九、总结
本课核心收获
| 知识点 | 掌握程度 |
|---|---|
| 串行爬虫的性能瓶颈 | 知道耗时主要在等待网络IO |
| 多线程原理 | 理解等待IO时切换线程,提高CPU利用率 |
| GIL对爬虫影响有限 | 能解释为什么爬虫适合多线程 |
| ThreadPoolExecutor使用 | 能独立写出并发请求代码 |
| 线程安全与锁 | 能识别共享资源,用Lock保护 |
| 限速与并发控制 | 能设置max_workers和随机延时 |
| 效率对比 | 能实测并量化提速效果 |
代码模板总结
一个通用的多线程爬虫模板:
import concurrent.futures
import requests
import time
def task(url):
# 单个任务逻辑
resp = requests.get(url)
return resp.text
def main():
urls = [...] # 准备URL列表
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
results = list(executor.map(task, urls))
# 处理results
性能提升展望
对于一个需要爬取10000个页面的项目:
- 串行(1线程):假设每个请求+解析平均2秒 → 20000秒 ≈ 5.5小时
- 5线程:理想约4000秒 ≈ 1.1小时(提速5倍)
- 10线程:理想约2000秒 ≈ 33分钟
实际提升受网络带宽、服务器响应速度、反爬限速等因素影响,但至少能快3-5倍。
十、课后作业
作业1:实现串行与并行对比测试(必做)
使用课堂中的httpbin.org/delay/1测试接口,分别用串行和线程池(5个线程)爬取20页,记录耗时,并计算加速比。
提交:代码和运行结果截图。
作业2:多线程爬取图片列表(必做)
选择一个公开的图片网站(如https://picsum.photos/提供的随机图片API),要求:
- 生成100个图片URL(
https://picsum.photos/200/300?random=1等) - 用多线程(10个线程)下载图片保存到本地文件夹
- 注意线程安全:每个线程下载的图片文件名不要冲突(可以用URL中的random参数或序号)
- 统计成功下载数量
提示:使用requests.get(url, stream=True)流式写入文件。
作业3:实现带限速的多线程爬虫(必做)
修改作业2的代码,增加全局请求速率限制(例如每秒不超过10个请求)。使用一个简单的RateLimiter类(见6.1节),确保下载图片时不会因过快被拒绝。
验证:打印每个请求的时间戳,确保间隔≥0.1秒。
作业4:处理线程安全的数据收集(选做)
爬取一个分页API(如https://jsonplaceholder.typicode.com/posts,共100条,每页20条),用多线程请求5页,每个线程解析后把数据放入一个共享列表。使用threading.Lock保护列表,或使用queue.Queue收集。最终验证数据总条数是否为100。
作业5:模拟反爬下的自适应并发(进阶)
设计一个爬虫,初始并发数为5,当连续多次请求成功时,逐步增加并发数(最多20);当遇到429状态码或请求超时时,降低并发数并使用指数退避。实现这种自适应流量控制。
作业6:思考题(必做)
如果一个网站严格限制单个IP每秒最多2个请求,而你需要爬取1000个页面。使用多线程(10个线程)时,你应该如何设计限速策略才能既不超过限制,又尽可能提高速度?写出你的方案。
结束语:多线程是爬虫工程师工具箱里必不可少的扳手。它简单、有效、立竿见影。从这一课开始,你的爬虫不再是“慢速观光车”,而是可以同时发起多个请求的“高效采集车”。掌握好并发控制和限速,你就能在效率和稳定之间找到最佳平衡。
下一课,我们将深入异步IO——用更少的资源实现上万并发,让爬虫速度再上一个台阶。但多线程已经能应对绝大多数日常需求,请务必完成作业,亲身体验性能飞跃。
第19课见。
🔗《20节课精通网页爬虫》系列课程导航
🌟 感谢您耐心阅读到这里!
💡 如果本文对您有所启发欢迎:
👍 点赞📌 收藏 📤 分享给更多需要的伙伴。
🗣️ 期待在评论区看到您的想法, 共同进步。
🔔 关注我,持续获取更多干货内容~
🤗 我们下篇文章见~
openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构
更多推荐

所有评论(0)