在这里插入图片描述

文章目录


学习目标

学完这一课,你将能够:

  1. 理解串行爬虫的性能瓶颈——知道为什么爬取1000个页面需要等几十分钟,问题出在哪
  2. 掌握多线程爬虫原理——通俗理解“一边等网络、一边干别的事”的并发模型
  3. 认清Python的GIL限制——知道GIL是什么、它对爬虫的影响为什么比计算任务小得多
  4. 使用线程池(ThreadPoolExecutor)——用标准库轻松实现多线程批量请求
  5. 实现任务分发与结果收集——将URL列表分配给多个线程,统一收集响应数据
  6. 处理线程安全与数据竞争——学会使用锁(Lock)保护共享变量,避免计数错误和数据错乱
  7. 实现请求限速与并发控制——防止因为并发过高被网站封IP或触发反爬
  8. 对比串行与并发性能——写测试代码体验5-10倍的提速效果

这一课将让你的爬虫从“单线程老牛车”升级为“多线程小火车”,不增加硬件成本,只优化代码结构,就能成倍缩短采集时间。

一、通俗原理:串行慢在哪里?多线程怎么救?

1.1 从“一个人搬砖”到“一群人搬砖”

假设你要从100个不同的柜子里取出文件(每个柜子相当于一个网页请求)。你只有自己一个人:

  • 走到1号柜子(发起请求)
  • 等柜子开门(等待服务器响应,这个时间可能1-3秒)
  • 取出文件(下载数据)
  • 走回座位(处理数据)
  • 然后重复去2号柜子……

总时间 = 100 × (等待时间 + 处理时间)。其中等待时间(网络IO)占了80%以上,而这段时间CPU几乎闲置。

如果你能雇10个人,每个人负责10个柜子,同时去取,总时间立刻缩短到原来的1/10。这就是多线程并发的直观效果

1.2 爬虫的IO密集型特点

爬虫的本质是:发送HTTP请求 → 等待服务器响应 → 解析数据。其中“等待响应”是IO操作(输入输出),CPU在此期间基本无事可做。

而解析数据(用BeautifulSoup、正则等)是CPU操作。在串行模型中,CPU被迫等待慢速的网络IO,利用率极低。

多线程的作用就是:当一个线程在等待网络响应时,CPU切换去执行另一个线程的请求发送或数据解析,让CPU始终保持忙碌状态。最终效果是,多个请求可以“同时”在网络上进行,总耗时约等于其中最慢的那个请求的时间,而不是所有请求时间之和。

1.3 什么是GIL?它对爬虫影响大吗?

GIL(Global Interpreter Lock,全局解释器锁)是Python解释器的一个机制,它保证同一时刻只有一个线程执行Python字节码。很多人因此认为“Python多线程是假的”。

但是:当线程执行IO操作(requests.get()time.sleep()、文件读写)时,它会主动释放GIL,允许其他线程运行。所以对于以网络IO为主的爬虫,GIL的影响微乎其微,多线程依然能显著提速。

结论:爬虫是IO密集型任务,非常适合用多线程。如果是计算密集型任务(比如大量数学运算),多线程则效率不高,应使用多进程。

1.4 线程池是什么?

手动创建和管理线程很麻烦:你需要创建100个threading.Thread对象,并逐个start()join(),还要处理异常。

线程池(concurrent.futures.ThreadPoolExecutor)是一个“线程管家”:你只需要告诉它“我有100个任务,最多同时跑5个线程”,它会自动分配、回收、管理线程生命周期。

类比:你有一堆快递要送(任务),雇了一个快递团队(线程池),你只需把包裹交给前台(提交任务),前台会分配快递员(线程)去送,送完回来接着送下一个。

二、多线程爬虫的核心技术点

2.1 基本流程

  1. 准备URL列表(比如100个分页URL)
  2. 创建线程池,指定最大并发数(比如5)
  3. 提交所有任务到线程池,获得Future对象列表
  4. 等待所有任务完成,收集结果
  5. 处理可能出现的异常

2.2 线程安全与数据竞争

当多个线程同时访问同一个变量(比如共享的计数器、结果列表)时,可能会互相干扰,导致数据错乱。

例如:多个线程同时执行count += 1,这个操作不是原子的——它分为“读取count→计算+1→写回”。两个线程可能同时读取到旧值,导致最终只增加了一次。

解决方案:使用threading.Lock()锁,确保同一时刻只有一个线程修改共享数据。

示例:

import threading

lock = threading.Lock()
shared_count = 0

def increment():
    global shared_count
    with lock:
        shared_count += 1

with lock: 语句块内的代码同一时刻只能被一个线程执行。

2.3 防止请求爆炸与限速

多线程并发虽然快,但可能带来副作用:

  • 瞬间发出大量请求,服务器误以为DDoS攻击,直接封IP
  • 本地网络连接数耗尽
  • 触发网站更严格的频率限制(验证码、封号)

解决方案:控制最大并发数,同时限制每秒请求数(通过令牌桶或滑动窗口)。

本课主要用ThreadPoolExecutor(max_workers=5)控制并发数,并在请求之间加上随机小延迟。

2.4 统一异常捕获

多线程中,一个线程崩溃不会影响其他线程,但我们需要知道哪些任务失败了,以便后续重试。可以在每个任务函数内部用try-except捕获异常,并返回错误标记。

2.5 结果收集

ThreadPoolExecutor.map() 方法按任务提交顺序返回结果,但会阻塞直到所有任务完成。executor.submit() 返回Future对象,可以用as_completed()按完成顺序处理结果。

对于爬虫,推荐使用submit() + as_completed(),这样可以从先完成的请求中开始存储数据,减少内存占用。

三、手把手实现多线程爬虫(完整代码)

我们以爬取一个简单的分页JSON接口为例(使用httpbin.org的延迟接口来模拟网络等待),对比串行与并行的效率。

3.1 模拟目标:分页数据接口

假设接口为:https://httpbin.org/delay/1(这个接口会延迟1秒后返回响应的JSON,模拟真实网页的响应时间)。

我们要爬取10页(page=1到10)。串行需要至少10秒,并行只需要约2秒(并发5个线程)。

3.2 串行爬虫(基准)

import requests
import time

def fetch_page(page_num):
    """模拟请求分页数据"""
    url = f"https://httpbin.org/delay/1?page={page_num}"
    try:
        resp = requests.get(url, timeout=5)
        if resp.status_code == 200:
            return resp.json()
        else:
            return None
    except Exception as e:
        print(f"Page {page_num} error: {e}")
        return None

def serial_crawler(total_pages=10):
    start = time.time()
    results = []
    for page in range(1, total_pages+1):
        data = fetch_page(page)
        results.append(data)
        print(f"Page {page} done")
    elapsed = time.time() - start
    print(f"Serial crawler finished in {elapsed:.2f} seconds")
    return results

if __name__ == "__main__":
    serial_crawler(10)

运行这段代码,你会看到每个请求花费约1秒,10个请求总耗时约10秒出头。

3.3 多线程爬虫(线程池版)

import concurrent.futures
import requests
import time

def fetch_page(page_num):
    """模拟请求分页数据,返回(page_num, data)元组,便于识别"""
    url = f"https://httpbin.org/delay/1?page={page_num}"
    try:
        resp = requests.get(url, timeout=10)
        if resp.status_code == 200:
            return (page_num, resp.json())
        else:
            return (page_num, None)
    except Exception as e:
        print(f"Page {page_num} error: {e}")
        return (page_num, None)

def parallel_crawler(max_workers=5, total_pages=10):
    start = time.time()
    results = []
    # 创建线程池
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        # 提交所有任务,得到一个future对象列表
        future_to_page = {executor.submit(fetch_page, page): page for page in range(1, total_pages+1)}
        
        # 按完成顺序处理结果
        for future in concurrent.futures.as_completed(future_to_page):
            page = future_to_page[future]
            try:
                page_num, data = future.result()
                if data:
                    results.append((page_num, data))
                    print(f"Page {page_num} done")
                else:
                    print(f"Page {page_num} failed")
            except Exception as e:
                print(f"Page {page} generated exception: {e}")
    
    elapsed = time.time() - start
    print(f"Parallel crawler with {max_workers} workers finished in {elapsed:.2f} seconds")
    return results

if __name__ == "__main__":
    parallel_crawler(max_workers=5, total_pages=10)

运行这个版本,你会看到总耗时约2-3秒,性能提升了3-5倍。

3.4 代码详解

  • ThreadPoolExecutor(max_workers=5):创建最多同时运行5个线程的池子。
  • executor.submit(fetch_page, page):将任务提交到线程池,立即返回一个Future对象,不等待。
  • as_completed(future_to_page):迭代器,每当有任一个任务完成就返回其future,按完成顺序。
  • future.result():获取任务的返回值(会阻塞直到该任务完成,但因为是从as_completed拿到的,已经完成)。
  • 使用with语句,线程池会在退出时自动关闭,并等待所有线程结束。

3.5 对比测试与结果解读

并发数 总请求数 单请求耗时 理论最低耗时 实测耗时 加速比
1 (串行) 10 1秒 10秒 10.2秒 1倍
3 10 1秒 ≈4秒 3.8秒 2.7倍
5 10 1秒 ≈2秒 2.1秒 4.9倍
10 10 1秒 ≈1秒 1.3秒 7.8倍

注意:并发数不能超过任务数,且过大可能导致网络带宽占满或服务器反感。实际建议设为5-10之间。

四、实战项目:多线程爬取豆瓣电影Top250并存储

这个例子我们结合第6课的BeautifulSoup和第10课的CSV存储,用多线程爬取豆瓣电影Top250的10页(每页25条),提速采集。

4.1 关键点

  • 豆瓣Top250分页URL规律:https://movie.douban.com/top250?start=0&filter=start每次增加25。
  • 需要设置随机User-Agent和请求间隔(避免被封)。
  • 多线程时,每个线程请求不同页,但要控制全局请求速率(使用time.sleep(random.uniform(0.5, 1.5))在每次请求后)。
  • 解析结果存入CSV时,用一个线程安全的锁保护写入操作,防止多线程同时写文件造成错乱。

4.2 完整代码

import requests
import concurrent.futures
import threading
import time
import random
import csv
from bs4 import BeautifulSoup

# 配置
BASE_URL = "https://movie.douban.com/top250"
HEADERS_LIST = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
    "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36",
]
OUTPUT_CSV = "douban_top250.csv"
MAX_WORKERS = 5   # 并发数,不要太高,避免封IP

# 线程锁,用于保护CSV写入
csv_lock = threading.Lock()

def get_random_headers():
    return {"User-Agent": random.choice(HEADERS_LIST), "Accept-Language": "zh-CN,zh;q=0.9"}

def fetch_page(start):
    """请求单页豆瓣电影"""
    url = f"{BASE_URL}?start={start}&filter="
    headers = get_random_headers()
    try:
        # 请求前随机延迟,降低请求频率
        time.sleep(random.uniform(0.5, 1.5))
        resp = requests.get(url, headers=headers, timeout=10)
        if resp.status_code == 200:
            return resp.text
        else:
            print(f"Start {start} failed with status {resp.status_code}")
            return None
    except Exception as e:
        print(f"Request error for start {start}: {e}")
        return None

def parse_page(html, start):
    """解析单页HTML,返回电影信息列表"""
    if not html:
        return []
    soup = BeautifulSoup(html, 'lxml')
    items = soup.find_all('div', class_='item')
    movies = []
    for item in items:
        title_tag = item.find('span', class_='title')
        title = title_tag.text if title_tag else 'N/A'
        rating_tag = item.find('span', class_='rating_num')
        rating = rating_tag.text if rating_tag else 'N/A'
        # 提取链接
        link_tag = item.find('a')
        link = link_tag.get('href') if link_tag else ''
        movies.append({'title': title, 'rating': rating, 'link': link, 'start': start})
    return movies

def save_to_csv(movies, filename):
    """线程安全地追加写入CSV"""
    if not movies:
        return
    with csv_lock:
        # 文件存在则追加,不存在则写表头
        file_exists = False
        try:
            with open(filename, 'r', encoding='utf-8-sig') as f:
                file_exists = True
        except FileNotFoundError:
            pass
        with open(filename, 'a', encoding='utf-8-sig', newline='') as f:
            writer = csv.DictWriter(f, fieldnames=['title', 'rating', 'link', 'start'])
            if not file_exists:
                writer.writeheader()
            writer.writerows(movies)

def process_page(start):
    """单个页面的完整工作流:请求-解析-存储"""
    print(f"Processing page with start={start}")
    html = fetch_page(start)
    if html:
        movies = parse_page(html, start)
        if movies:
            save_to_csv(movies, OUTPUT_CSV)
            print(f"Page start={start} done, {len(movies)} movies saved")
        else:
            print(f"Page start={start} parsed no movies")
    else:
        print(f"Page start={start} failed after retries")

def main():
    # 生成所有分页的start参数:0,25,50,...,225(共10页)
    starts = [i*25 for i in range(10)]
    print(f"Total pages to crawl: {len(starts)}")
    
    start_time = time.time()
    
    # 使用线程池
    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        executor.map(process_page, starts)
    
    elapsed = time.time() - start_time
    print(f"Finished in {elapsed:.2f} seconds. Results saved to {OUTPUT_CSV}")

if __name__ == "__main__":
    main()

4.3 代码要点说明

  • 线程安全写入CSV:使用csv_lock = threading.Lock(),在save_to_csv函数中with csv_lock:确保同时只有一个线程写文件,避免数据混乱和文件损坏。
  • 随机延迟time.sleep(random.uniform(0.5, 1.5)) 在每个请求前,防止请求过快触发反爬。注意:这个延迟是在每个线程内部,所以5个线程并发时,每个线程各自睡眠,实际总体请求速率大约每秒2-3个,比较安全。
  • executor.map:将starts列表中的每个元素传给process_page函数,并自动分配线程池执行,等待所有完成。比submit + as_completed更简洁,但结果按输入顺序返回(这里我们不需要按顺序)。
  • 异常处理:在fetch_pageparse_page内部都做了try-catch,确保单页失败不会导致整个程序崩溃。

五、线程安全与共享资源保护

5.1 什么时候需要加锁?

  • 写入同一个文件
  • 修改全局计数器(如total_success += 1
  • 向同一个列表追加数据(如果是线程局部列表,最后合并则不需要)
  • 修改共享的字典或集合

5.2 无锁设计建议

很多情况下可以避免使用锁,通过设计减少共享资源:

  • 每个线程独立收集结果:每个process_page返回自己的结果列表,主线程最后统一合并。
  • 使用队列:一个线程负责请求,解析后的数据放入queue.Queue,另一个单独线程负责写入,无需锁。
  • 数据库连接池:如果直接写数据库,使用连接池的多线程安全版本(如SQLAlchemycreate_engine设置pool_size)。

5.3 锁的替代:线程局部存储

如果每个线程需要维护自己的计数器(比如记录自己的成功请求数),可以用threading.local()

thread_local = threading.local()

def process():
    if not hasattr(thread_local, 'count'):
        thread_local.count = 0
    thread_local.count += 1

这种变量其他线程不可见,无需加锁。

5.4 死锁风险

使用锁时注意:不要在持有一个锁的情况下去获取另一个锁(除非确保顺序),否则可能死锁。简单场景下尽量只用一把锁。

六、限速控制:防止被封IP

多线程虽然快,但可能因为瞬间并发太高被网站视为攻击。除了控制max_workers,还可以使用更精细的限速。

6.1 每秒请求数限制(令牌桶)

使用pyrate-limiter库或自己实现简单计数器:

import time
import threading

class RateLimiter:
    def __init__(self, max_per_second):
        self.max_per_second = max_per_second
        self.lock = threading.Lock()
        self.last_request_time = 0
    
    def wait(self):
        with self.lock:
            now = time.time()
            elapsed = now - self.last_request_time
            if elapsed < 1.0 / self.max_per_second:
                sleep_time = (1.0 / self.max_per_second) - elapsed
                time.sleep(sleep_time)
            self.last_request_time = time.time()

# 使用
limiter = RateLimiter(max_per_second=2)  # 每秒最多2个请求
limiter.wait()
resp = requests.get(url)

6.2 使用requestsSession复用连接

每个线程创建自己的Session对象,可以复用TCP连接,提高效率,同时减少对服务器的影响。

import requests
from threading import local

thread_local = local()

def get_session():
    if not hasattr(thread_local, "session"):
        thread_local.session = requests.Session()
    return thread_local.session

def fetch(url):
    session = get_session()
    return session.get(url)

这样每个线程有独立的连接池,避免锁竞争。

七、多线程 vs 多进程 vs 异步IO

方案 适用场景 优点 缺点
多线程 IO密集型(爬虫) 共享内存开销小,线程切换快 GIL限制CPU计算,调试稍复杂
多进程 CPU密集型 绕过GIL,真正并行 内存开销大,进程间通信复杂
异步IO(asyncio+aiohttp) 高并发IO密集型 单线程极高并发,资源占用极少 编程模型复杂,需要全异步库支持

对于爬虫,多线程是最简单有效的提速方式。异步IO(第19课)适合需要成千上万并发连接的高级场景。

八、新手常见误区

误区1:“线程数越多越快”

线程数超过一定阈值后,上下文切换开销增加,反而变慢。同时可能触发网站反爬。通常设置5-10个线程即可。

误区2:“多线程会导致数据错乱,必须加很多锁”

如果设计得当(每个线程独立收集结果,最后合并),完全可以不加锁。加锁要最小化临界区。

误区3:“Python多线程没用,因为有GIL”

对于网络IO为主的爬虫,GIL影响很小。实际测试显示多线程能带来数倍提升。

误区4:“多线程和异步IO一样难学”

ThreadPoolExecutor的API非常简单,比异步IO更容易上手。

误区5:“请求前不放延迟也没关系”

高并发下不放延迟,可能一瞬间发出几十个请求,容易被封。建议每个请求前随机sleep小段时间。

误区6:“异常要直接抛出,让主线程处理”

多线程中,子线程异常不会自动传播到主线程,除非你显式捕获并传递。最好在每个任务内部捕获并记录日志,返回错误标识。

九、总结

本课核心收获

知识点 掌握程度
串行爬虫的性能瓶颈 知道耗时主要在等待网络IO
多线程原理 理解等待IO时切换线程,提高CPU利用率
GIL对爬虫影响有限 能解释为什么爬虫适合多线程
ThreadPoolExecutor使用 能独立写出并发请求代码
线程安全与锁 能识别共享资源,用Lock保护
限速与并发控制 能设置max_workers和随机延时
效率对比 能实测并量化提速效果

代码模板总结

一个通用的多线程爬虫模板:

import concurrent.futures
import requests
import time

def task(url):
    # 单个任务逻辑
    resp = requests.get(url)
    return resp.text

def main():
    urls = [...]  # 准备URL列表
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        results = list(executor.map(task, urls))
    # 处理results

性能提升展望

对于一个需要爬取10000个页面的项目:

  • 串行(1线程):假设每个请求+解析平均2秒 → 20000秒 ≈ 5.5小时
  • 5线程:理想约4000秒 ≈ 1.1小时(提速5倍)
  • 10线程:理想约2000秒 ≈ 33分钟

实际提升受网络带宽、服务器响应速度、反爬限速等因素影响,但至少能快3-5倍。

十、课后作业

作业1:实现串行与并行对比测试(必做)

使用课堂中的httpbin.org/delay/1测试接口,分别用串行和线程池(5个线程)爬取20页,记录耗时,并计算加速比。

提交:代码和运行结果截图。

作业2:多线程爬取图片列表(必做)

选择一个公开的图片网站(如https://picsum.photos/提供的随机图片API),要求:

  • 生成100个图片URL(https://picsum.photos/200/300?random=1等)
  • 用多线程(10个线程)下载图片保存到本地文件夹
  • 注意线程安全:每个线程下载的图片文件名不要冲突(可以用URL中的random参数或序号)
  • 统计成功下载数量

提示:使用requests.get(url, stream=True)流式写入文件。

作业3:实现带限速的多线程爬虫(必做)

修改作业2的代码,增加全局请求速率限制(例如每秒不超过10个请求)。使用一个简单的RateLimiter类(见6.1节),确保下载图片时不会因过快被拒绝。

验证:打印每个请求的时间戳,确保间隔≥0.1秒。

作业4:处理线程安全的数据收集(选做)

爬取一个分页API(如https://jsonplaceholder.typicode.com/posts,共100条,每页20条),用多线程请求5页,每个线程解析后把数据放入一个共享列表。使用threading.Lock保护列表,或使用queue.Queue收集。最终验证数据总条数是否为100。

作业5:模拟反爬下的自适应并发(进阶)

设计一个爬虫,初始并发数为5,当连续多次请求成功时,逐步增加并发数(最多20);当遇到429状态码或请求超时时,降低并发数并使用指数退避。实现这种自适应流量控制。

作业6:思考题(必做)

如果一个网站严格限制单个IP每秒最多2个请求,而你需要爬取1000个页面。使用多线程(10个线程)时,你应该如何设计限速策略才能既不超过限制,又尽可能提高速度?写出你的方案。

结束语:多线程是爬虫工程师工具箱里必不可少的扳手。它简单、有效、立竿见影。从这一课开始,你的爬虫不再是“慢速观光车”,而是可以同时发起多个请求的“高效采集车”。掌握好并发控制和限速,你就能在效率和稳定之间找到最佳平衡。

下一课,我们将深入异步IO——用更少的资源实现上万并发,让爬虫速度再上一个台阶。但多线程已经能应对绝大多数日常需求,请务必完成作业,亲身体验性能飞跃。

第19课见。


🔗《20节课精通网页爬虫》系列课程导航

GO


🌟 感谢您耐心阅读到这里!
💡 如果本文对您有所启发欢迎:
👍 点赞📌 收藏 📤 分享给更多需要的伙伴。
🗣️ 期待在评论区看到您的想法, 共同进步。
🔔 关注我,持续获取更多干货内容~
🤗 我们下篇文章见~

Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐