一、为什么我又一次踩在“大文件上传”这个坑上

做后台系统这几年,文件上传这个需求几乎是每个项目都跑不掉的:

  • 几百 MB 的视频素材;
  • 设计师丢过来的几 GB PSD;
  • 数据中台导入的 CSV、Parquet;
  • AI 训练用的模型权重文件。

一旦文件大到几百 MB 以上,单次 POST 一把梭就开始翻车

  1. 网络一抖动整个请求重来,前面传的全废;
  2. 用户切换 Wi-Fi、电脑休眠,几十分钟白费;
  3. 想做进度条只能听 XHR.upload.onprogress 的,刷新就归零;
  4. 服务端要做完整性校验,前端不给 MD5 根本对不上。

标准答案就是分片上传 + 并发 + 重试 + 断点续传 + 文件 MD5。但我每次都得现写一套 Promise 池、AbortControllerSparkMD5,写完还总在边界 case 上打补丁。

最近发现一个国人维护、开箱即用的库:FluxForge。试着把它接进了项目,感受是“该有的全有,API 还很克制”。本文从使用者角度讲讲怎么用它,以及它对常见痛点的解法。


二、FluxForge 是什么?为什么值得试一下

FluxForge 是一个 基于 Web Worker 的浏览器端文件分片库,npm 包名就叫 fluxforge,TypeScript 编写、ESM/CJS 双包、零依赖业务侵入。

核心能力一句话概括:

把一个 File 切成若干分片,每个分片在 Worker 里算完 MD5 后丢给你定义的处理函数,并发跑、可暂停、可取消、可重试。

具体特性:

  • 多 Worker 并行哈希:按 navigator.hardwareConcurrency 起多线程,分片哈希直接交给 Worker,不卡主线程;
  • 可控并发:上传/转码/加密这种业务逻辑通过你自己写的 processor 串进来,库帮你管最大并发;
  • 完整任务生命周期pause / resume / cancel 都有,cancel 通过 AbortSignal 通知到正在跑的处理函数;
  • 指数退避自动重试:失败重试上限可配,超出抛 RetryExhaustedError 并带 cause
  • 真实文件 MD5:按顺序增量喂入 SparkMD5,结果与服务端 md5sum 一致,能直接用于秒传/校验;
  • TS 优先:错误类型 CancellationErrorRetryExhaustedError 都是导出的 class,instanceof 判断,不用看字符串。

适合的业务:OSS/MinIO 直传、自建对象存储分片上传、Web 端导入大文件、视频/模型上传、需要秒传或断点续传的所有场景


三、安装

pnpm add fluxforge
# 或
npm install fluxforge
# 或
yarn add fluxforge

浏览器要求(依赖 Web Workers / AbortSignal / Blob.arrayBuffer()):

  • Chrome ≥ 66
  • Firefox ≥ 57
  • Safari ≥ 12.1
  • Edge ≥ 16

正经的国内业务覆盖完全够用。


四、5 分钟跑通:一个最小可用的分片上传

下面以接入自家上传接口为例,给一段完整可跑的代码。

import {
  CancellationError,
  RetryExhaustedError,
  calculateFileHash,
  chunkFile,
  processChunks,
} from 'fluxforge'

async function uploadLargeFile(file: File) {
  // 1. 切片:返回与分片一一对应的 Promise 数组
  //    每个 Promise resolve 出来的 chunk 已经带好 MD5
  const chunkPromises = chunkFile(file, {
    chunkSize: 4 * 1024 * 1024, // 4 MiB,按业务和带宽自己调
  })

  // 2. 并发处理每一片(这里就是上传逻辑)
  const controller = processChunks(
    chunkPromises,
    async (chunk, signal) => {
      const form = new FormData()
      form.append('file', chunk.blob)
      form.append('index', String(chunk.index))
      form.append('hash', chunk.hash) // 单片 MD5,服务端可做去重/秒传

      await fetch('/api/upload/chunk', {
        method: 'POST',
        body: form,
        signal, // 关键:把 signal 透传给 fetch,cancel 才能立即生效
      })
    },
    {
      concurrency: 6, // 同时跑 6 片
      maxRetries: 3, // 单片最多重试 3 次
      retryBaseDelayMs: 500,
      retryMaxDelayMs: 5000,
      onProgress: (done, total) => {
        // 更新进度条
        console.log(`进度:${((done / total) * 100).toFixed(1)}%`)
      },
    },
  )

  try {
    await controller.promise

    // 3. (可选)算整个文件的 MD5,提交给服务端做最终校验
    const fileMd5 = await calculateFileHash(chunkPromises)
    await fetch('/api/upload/merge', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({ fileMd5, filename: file.name }),
    })
  }
  catch (error) {
    if (error instanceof CancellationError) {
      console.warn('用户取消了上传')
    }
    else if (error instanceof RetryExhaustedError) {
      console.error('重试用完仍失败:', error.cause)
    }
    else {
      throw error
    }
  }

  return controller // 把 controller 暴露给 UI,做暂停/取消按钮
}

整段代码里我没写一行 Worker、没自己撸 Promise 池、没手搓重试。这就是接它的最大动机。


五、暂停 / 恢复 / 取消:协作式控制的正确姿势

UI 上一般会有“暂停”“继续”“取消”三个按钮,对应的就是 controller 上的三个方法:

controller.pause() // 暂停后续任务
controller.resume() // 继续
controller.cancel() // 取消(abort signal)

有一个细节文档里说得很清楚,实战时一定要注意

  • pause()协作式的——已经开始的 processor 会继续跑到它自己返回,不会半路掐。这避免了上传到一半留下损坏分片。新的任务才会等 resume
  • cancel() 会把 AbortSignal 标记为 abort。你必须把 signal 透传给 fetch / XHR 等异步 IO,否则正在跑的请求不会立即中断。

这点决定了你在写 processor 的时候,要么用 fetch(url, { signal }),要么自己手动检查 signal.aborted,否则取消按钮就只是个摆设。


六、自动重试:把网络抖动吃下去

弱网或者服务端偶发 5xx 是日常。FluxForge 内置了指数退避重试

参数 默认值 含义
maxRetries 3 单分片失败最大重试次数(不含首次)
retryBaseDelayMs 500 退避基准时间
retryMaxDelayMs 5000 退避时间上限

也就是说默认配置下,一片最多跑 4 次(1 次首发 + 3 次重试),间隔大致 500ms → 1s → 2s → 4s(夹到 5s 内)。

超过上限的处理也很规范:

catch (error) {
  if (error instanceof RetryExhaustedError) {
    console.log(`总共尝试了 ${error.attempts}`)
    console.log('最后一次错误:', error.cause)
  }
}

不用解析错误字符串,靠 instanceof 判断,这种 API 风格特别讨喜


七、文件 MD5 一致性:秒传和校验的关键

很多分片上传库给的只是**“每片自己的 hash”,但秒传、整文件校验需要的是“整个文件的 MD5”**,两者不是一回事。

FluxForge 提供了 calculateFileHash

const fileMd5 = await calculateFileHash(chunkPromises)

实现思路是按索引顺序依次读取每片二进制,增量喂给 SparkMD5。文档明确说了一句关键的话:

结果与服务端对原始文件执行 md5sum 一致

我特地拿一个 1.2 GB 的视频做了对比:浏览器算出来的 MD5 和服务器 md5sum file.mp4 输出完全一致。这意味着你可以放心地用这个值做:

  • 秒传:上传前先发 MD5,命中直接返回;
  • 校验:所有分片合并后服务端再算一次,对比一致才标记成功;
  • 去重:同一份文件不同用户多次上传,存储层只留一份。

注意 calculateFileHash 会触发对每个分片的 arrayBuffer() 读取,对超大文件有内存代价。不需要整文件 MD5 的场景就别调。


八、几个我踩过/想清楚了的细节

写完接入后总结几条,给后来者避坑:

  1. chunkSize 不是越小越好。 太小(比如 256 KB)请求数爆炸,服务端 QPS 顶不住;太大(比如 32 MB)单片重试代价高。经验值 2~8 MiB 之间
  2. concurrency 配合服务端能力调。 默认 6 对自建服务比较友好;对 OSS 这类云存储可以拉到 10~16。
  3. 务必透传 signal 否则 cancel() 取消的只是“还没开始的任务”,已开始的请求会一直跑完。
  4. pause 期间想给用户展示“剩余分片”onProgress(done, total) 在暂停时就不再被触发,但 done / total 你自己存就行。
  5. 错误判别只看 instanceof,不要看 error.message:库的导出方式让 error instanceof CancellationError 完全够用,未来文案变化也不会破。
  6. MD5 只用于完整性,不要用于安全场景。文档里也专门点了这一句,提醒同学不要拿它做密码/签名。

九、总结

如果你正在做这些事情:

  • 给后台系统加大文件分片上传
  • 接 OSS / MinIO 的直传 + 断点续传
  • 想要秒传、文件 MD5 校验
  • 不想自己造 Worker 池 + Promise 池 + AbortController + 重试机的轮子。

FluxForge 基本能开箱解决。它的 API 表面很小(就 chunkFile / processChunks / collectChunks / calculateFileHash 四个函数 + 两个错误类),但把分片上传里最烦人的并发、生命周期、重试、哈希这些都封装好了。

更重要的是,它不绑业务:上传逻辑你自己写,库只管把分片喂给你、帮你管控并发和生命周期。这种“克制”的设计比那种“一把梭直接帮你 POST”的方案更适合放进真实项目。

仓库:https://github.com/joygqz/fluxforge
在线 Demo:https://joygqz.github.io/fluxforge/
NPM:https://www.npmjs.com/package/fluxforge

如果这篇文章帮到你,欢迎点赞、收藏 + 关注。后续我会再写一篇配合 OSS/MinIO 实战 + 服务端合并分片的进阶教程。

Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐