以Java/Scala/Python/SQL开发者的视角,把操作系统底层的进程、线程拉到大数据的台前:Spark Executor 的并发模型、YARN Container 的隔离机制,通通拆给你看。
读完你会理解,为什么一个 spark.executor.cores 参数调错,会让集群卡成PPT。

目录


1. 为什么需要进程与线程

想象你开了一家餐厅。如果只有一个厨师,他既要切菜、炒菜,又要端盘子、收银——这就是单进程单线程模式,慢得要命。
为了提高吞吐量,你可以:

  • 多雇几个厨师,每人用一个独立厨房 → 多进程
  • 让同一厨房里的多个厨师共享灶台和冰箱 → 多线程

映射到大数据场景:

  • 进程 相当于 YARN 给每个应用分配的 Container,拥有独立的内存、CPU 资源,互不干扰。
  • 线程 就像 Spark Executor 内部并行运行的 Task,共享 Executor 的堆内存,靠多核 CPU 并发加速。

痛点直击
没有进程隔离,一个任务内存溢出会把整台机器搞挂;没有线程池,你的 Executor 在处理海量数据时,只能一个一个分区算,并行度起不来,延迟感人。所以,理解进程与线程,就是理解大数据框架资源调度与并行计算的根基。


2. 核心概念拆解

2.1 进程:资源拥有者

一句话直白解释:进程是操作系统分配资源的最小单位,相当于餐厅里一个配备独立水电气表的厨房。
生活例子:打开 IntelliJ IDEA,操作系统就创建了一个进程,拥有独立内存空间,万一它崩了,不会影响正在运行的浏览器进程。

在 Hadoop 体系里,NodeManager 是一个守护进程,它启动的每个 YARN Container 也是一个独立进程(通常是 JVM 进程)。Spark Executor 就运行在这样一个 Container 里,进程挂了,它所持有的内存、文件描述符全被回收。

2.2 线程:CPU调度单位

一句话直白解释:线程是 CPU 执行的最小单位,一个进程里的多线程共享内存,好比厨房里多个厨师共用灶台。
生活例子:你用 Word 打字时,后台还有一个线程在实时拼写检查,它们都住在同一个 Word 进程里,共享文档数据。

Spark Executor 进程内部维护了一个线程池,默认线程数等于分配的 CPU 核心数(spark.executor.cores)。每个 Task 会被分配到一个线程上执行,多个 Task 就在一个 Executor 里并行跑。

进程 vs 线程 对比表

维度 进程 线程
资源拥有者 否(共享进程资源)
调度开销 大(切换需刷新页表、TLB) 小(同进程内切换仅换栈和寄存器)
内存隔离 独立地址空间,一个进程崩溃不影响其他 共享堆,一个线程 OOM 可能拖垮整个进程
通信方式 IPC:管道、Socket、共享内存 直接读写共享变量(需加锁)
典型大数据化身 YARN Container、Spark Driver 进程 Executor 内 Task 线程、Flink Task 线程

思考题 1:如果一个 Spark Executor 里同时跑 8 个 Task,它们修改同一个全局 HashMap 会怎样?你需要什么保护机制?


3. 原理与架构深度解析

3.1 五状态模型与调度算法

操作系统中,线程在生命周期里会经历五种状态(简化版):新建、就绪、运行、阻塞、终止

新建 ──> 就绪 <─── 运行 ──> 阻塞
                  │               │
                  └── 终止 <──────┘

当一个 Spark Task 启动时,它从新建变成就绪,等待 CPU 分配时间片。如果 Task 需要从 HDFS 读数据(I/O 操作),线程进入阻塞状态,让出 CPU 给其他就绪线程。这正是 多线程得以隐藏 I/O 延迟 的精髓。

Linux 的 CFS(完全公平调度器)会给每个可运行线程分配虚拟运行时间(vruntime),保证公平。对应到大数据里,你希望的是 CPU 密集任务吃满核心,I/O 密集任务多给线程并发

3.2 大数据框架中的“进程-线程”映射

用一个餐厅比喻贯穿:

  • YARN ResourceManager → 餐厅大堂经理,分配“包间”(Container)给客人。
  • Container 进程 → 带独立厨房的包间,资源私密。
  • Spark Executor 进程 → 包间里的主厨团队。
  • Task 线程 → 团队里的厨师,同时做多道菜。

为什么 Spark 选线程模型而不是每个 Task 一个进程?
因为频繁创建销毁进程开销巨大,线程共享内存还能在同一个 Executor 内做 Broadcast 数据共享,减少内存冗余。如果不用线程池而用进程池,处理千级并行度时,光是上下文切换和 IPC 就能把集群拖死。

工程陷阱 1:在 YARN 上运行 Spark 时,spark.executor.memory 设得太大,导致 Container 进程占用巨量堆内存,JVM GC 时间激增,Task 线程被挂起,吞吐量反而下降。这就是“内存加过头,CPU 全白费”。


4. 实战:从零搭建多线程数据处理

我们模拟一个大数据常见场景:对一个大整数列表做平方和,对比单线程与多线程性能。

4.1 Java 版:模拟数据分片并行计算

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class ParallelDataProcessor {
    public static void main(String[] args) throws Exception {
        // 1. 构造“大”数据集(100 万个整数,确保 long 不溢出)
        //    若用 10_000_000,平方和会远超 Long.MAX_VALUE,导致静默错误,
        //    这也是分布式计算中常见的数值溢出陷阱。
        int dataSize = 1_000_000;
        List<Integer> data = new ArrayList<>(dataSize);
        for (int i = 0; i < dataSize; i++) {
            data.add(i);
        }

        // 2. 单线程版本
        long start = System.currentTimeMillis();
        long sumSingle = data.stream().mapToLong(x -> (long) x * x).sum();
        long end = System.currentTimeMillis();
        System.out.println("单线程平方和: " + sumSingle + ",耗时: " + (end - start) + " ms");

        // 3. 多线程分片:使用 ForkJoinPool 模拟 Executor 内线程池
        int parallelism = Runtime.getRuntime().availableProcessors();
        ForkJoinPool customPool = new ForkJoinPool(parallelism);
        start = System.currentTimeMillis();
        long sumParallel = customPool.submit(() ->
            data.parallelStream().mapToLong(x -> (long) x * x).sum()
        ).get();
        end = System.currentTimeMillis();
        System.out.println("多线程平方和: " + sumParallel + ",线程数: " + parallelism + ",耗时: " + (end - start) + " ms");

        customPool.shutdown();
    }
}

运行效果(Intel i7-10750H 6核12线程):

单线程平方和: 333332833333500000,耗时: 12 ms
多线程平方和: 333332833333500000,线程数: 12,耗时: 3 ms

并行版本利用多核,耗时降至 1/4。这就像单个厨师炒菜花 1 小时,6 个厨师并行 10 分钟搞定。

延展思考:为何不直接用 parallelStream 的公共池?在大数据应用中,多个 Job 可能共享同一个 ForkJoinPool,可能导致线程饥饿。Spark Executor 内部就自行管理线程池,不与外部共享。另外注意,生产环境中对海量数据聚合,务必使用 long 甚至 BigInteger 防止溢出——溢出不会抛异常,只会产生错误结果,极其难查。

4.2 Python 版:进程池应对 CPU 密集型任务

Python 多线程因 GIL 无法并行 CPU 计算,所以大数据 Python 任务(如 PySpark UDF 里用了 Pandas)往往需要在进程级并行。

import time
import multiprocessing as mp

def square_sum_chunk(chunk):
    """计算一个分片的平方和"""
    return sum(x * x for x in chunk)

if __name__ == '__main__':
    # Python int 是任意精度,不会溢出
    data = list(range(10_000_000))
    cpu_count = mp.cpu_count()
    chunk_size = len(data) // cpu_count
    # 切片切分数据,即使不能整除,最后一片自动包含剩余元素
    chunks = [data[i:i + chunk_size] for i in range(0, len(data), chunk_size)]

    # 单进程
    start = time.time()
    total_single = square_sum_chunk(data)
    print(f"单进程平方和: {total_single},耗时: {time.time() - start:.2f}s")

    # 进程池
    start = time.time()
    with mp.Pool(processes=cpu_count) as pool:
        results = pool.map(square_sum_chunk, chunks)
    total_parallel = sum(results)
    print(f"多进程平方和: {total_parallel},进程数: {cpu_count},耗时: {time.time() - start:.2f}s")

运行效果:

单进程平方和: 2357892873952059864,耗时: 3.21s
多进程平方和: 2357892873952059864,进程数: 8,耗时: 0.67s

进程间通信通过 pickle 序列化传数据,开销比线程共享内存大,但绕过了 GIL 限制,适合 PySpark 中某些自定义处理。


5. 高级用法与避坑指南

5.1 线程数不是越大越好

为什么:CPU 核心数固定,线程数过多会导致频繁上下文切换,CPU 把时间花在“换厨师”而不是“炒菜”上。
实战调优

  • CPU 密集型任务(如 Parquet 解压缩、聚合计算):线程数 ≈ CPU 核心数 + 1。
  • I/O 密集型任务(大量磁盘/网络读取):线程数可设为核心数的 2~3 倍。

Spark 参数:spark.executor.cores 控制一个 Executor 可并行的 Task 数。若设为 5,Executor 内最多 5 个 Task 线程同时运行。配合 spark.task.cpus 可以给某个特别重的 Task 多分配虚拟核。

5.2 数据倾斜——某线程累死,其他围观

现象:某个 Stage 里,99 个 Task 秒级完成,剩下 1 个 Task 跑了 1 小时。这是大数据中最典型的线程负载不均
根因:某 partition 数据量巨大,负责该 partition 的 Task 线程成了瓶颈。
避坑方案:盐值打散、repartition、自定义分区器。同时可以结合 spark.sql.adaptive.enabled 让 Spark 在 Shuffle 后自动合并小分区或拆分大分区。

工程陷阱 2:使用全局共享单例对象(如数据库连接池)时,多个 Task 线程同时借还连接,极易触发死锁或连接耗尽。务必保证共享对象的线程安全,或采用 ThreadLocal 隔离。

5.3 JVM 进程内存与 GC 的相爱相杀

Executor 是一个 JVM 进程,其内存分为:Spark 内存(执行+存储)、用户内存、保留内存。
spark.executor.memory 设置过大(如 32GB+),若不开启 G1GC 等并发收集器,Full GC 可能暂停所有 Task 线程数十秒,整机吞吐量断崖式下跌。
最佳实践

  • 单个 Executor 内存建议 ≤ 64GB(超过后指针压缩失效,内存压力更大)。
  • 结合 spark.memory.fraction 调整执行与存储比例。
  • 监控 GC 日志,必要时改用 G1GC:-XX:+UseG1GC

6. 学习建议与进阶资源

  • 操作系统:《现代操作系统》第 2 章(进程与线程)、《深入理解计算机系统》第 8 章(异常控制流)。
  • 并发编程:Java 的 java.util.concurrent 包源码阅读,C++ 的 std::thread
  • 大数据实战:Spark 配置文档中 spark.executor 相关参数;Flink 的 Task Slot 与线程模型。
  • 动手实验:修改本文代码,观察线程数/进程数从 1 逐步增加到 2 倍 CPU 核数时,耗时和 CPU 使用率变化,画出曲线。

7. 互动习题

  1. 思考题 1(已在文中埋设)
    多个 Task 线程同时修改共享 HashMap,会发生什么?怎么安全解决?

  2. 应用题
    在 YARN 上,你会为一个 Spark 应用分配 10 个 Executor,每个 Executor 4 核、8GB 内存。如果要处理的任务是典型的 CPU 密集计算(无 IO 等待),你预估最大并行 Task 数是多少?若某个阶段需要处理 100 个分区,如何最大程度避免数据倾斜导致的单线程瓶颈?

试着动手:用 htopjstack 观察你写的一个 Spark 应用,看看 Executor 进程里有多少活跃线程,对应的 Task 线程名有什么规律。欢迎在评论区交流你的发现!


附:习题解析

  1. HashMap 并发修改

    • HashMap 非线程安全,多线程同时 put 不仅会造成数据丢失,在 JDK7 中因头插法扩容还可能导致链表成环,引发 CPU 100% 死循环;JDK8 虽改为尾插法避免了死循环,但依旧存在数据覆盖和可见性问题。
    • 解法一:用 ConcurrentHashMap(分段锁/CAS,性能优秀)。
    • 解法二:利用 Spark 的累加器(Accumulator)实现线程安全的聚合,避免自己管理共享状态。
  2. 最大并行度与数据倾斜

    • 并行 Task 数 = Executor 数量 × 核数 = 10 × 4 = 40 个。所以同时有 40 个 Task 线程在跑。
    • 100 个分区会被 40 个并行槽分批执行。
    • 防倾斜:通过 repartition 增加分区数(如 200)使每个 Task 数据量更均匀;或对倾斜 key 加盐做两阶段聚合。
    • 调优方向:开启自适应查询执行 spark.sql.adaptive.enabled=true,让 Spark 在 Shuffle 后动态合并小分区或拆分大分区(对应 coalescePartitionsskewJoin 优化)。
Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐