👋 你好,欢迎来到我的博客!我是【菜鸟不学编程】
   我是一个正在奋斗中的职场码农,步入职场多年,正在从“小码农”慢慢成长为有深度、有思考的技术人。在这条不断进阶的路上,我决定记录下自己的学习与成长过程,也希望通过博客结识更多志同道合的朋友。
  
  🛠️ 主要方向包括 Java 基础、Spring 全家桶、数据库优化、项目实战等,也会分享一些踩坑经历与面试复盘,希望能为还在迷茫中的你提供一些参考。
  💡 我相信:写作是一种思考的过程,分享是一种进步的方式。
  
   如果你和我一样热爱技术、热爱成长,欢迎关注我,一起交流进步!

前言 😅

我见过最离谱的“文件变更监控”方案:定时任务每 3 秒扫一遍目录,扫到文件就比时间戳,时间戳变了就复制……听起来挺努力,跑起来像打地鼠:CPU 风扇呼呼转,日志刷屏,偶尔还漏文件。你问为啥?因为你在跟操作系统玩“猜我什么时候变”的游戏。
  WatchService 的出现就是为了让你别再干这种体力活:让 OS 在文件变更时通知你。当然,它也不是完美的——跨平台差异、事件合并、溢出、递归目录监控都得处理。但只要你写对了,效果真的很爽:改一个文件,系统立刻就知道,“同步系统”才能像个同步系统。🙂

I. WatchService 概述:注册目录 + 事件监听,别上来就写 while(true) 乱轮询

WatchService 的工作模式像“快递柜取件码”:

  1. 你先向某个目录注册“我关心的事件”
  2. 系统在事件发生时给你一个 WatchKey
  3. 你从 WatchKey 里取出事件列表处理
  4. 处理完要 reset(),不然下次就收不到了(这点坑过不少人)

最小骨架长这样:

WatchService ws = FileSystems.getDefault().newWatchService();
Path dir = Path.of("/data/inbox");

dir.register(ws,
        StandardWatchEventKinds.ENTRY_CREATE,
        StandardWatchEventKinds.ENTRY_DELETE,
        StandardWatchEventKinds.ENTRY_MODIFY);

while (true) {
    WatchKey key = ws.take();          // 阻塞等待事件(比傻轮询体面多了)
    for (WatchEvent<?> e : key.pollEvents()) {
        // 处理事件
    }
    boolean valid = key.reset();       // 非常重要
    if (!valid) break;
}

注意:上面监控的是目录,事件里拿到的是相对路径(相对于注册的目录),你需要 dir.resolve(relativePath) 才得到完整路径。

II. 事件类型:ENTRY_CREATEENTRY_DELETEENTRY_MODIFY(别把 MODIFY 当“内容一定变了”😑)

这三种事件最常用:

  • ENTRY_CREATE:新文件/目录创建
  • ENTRY_DELETE:文件/目录删除
  • ENTRY_MODIFY:文件被修改(但可能只是元数据变化)

2.1 一个常见误解:ENTRY_MODIFY 不是“文件内容一定变了”

很多编辑器保存文件时会触发一串操作:

  • 写临时文件
  • 删除原文件
  • 重命名临时文件为原文件
    于是你可能看到:CREATE + MODIFY + DELETE 一起乱飞。
    所以真实系统里,通常要做去抖(debounce):短时间内多次 MODIFY 合并成一次处理。

III. 轮询与键:WatchKeypollEvents(key 不 reset,你就等着“监控突然失明”吧)🙃

WatchKey 你可以理解为“这一批事件的收件箱”。它最关键的方法:

  • pollEvents():取出事件列表(批量)
  • reset():告诉系统“这批我处理完了,继续给我推新的”
  • watchable():返回当初注册的对象(通常是 Path)

3.1 take() vs poll()

  • take():阻塞等待事件(常用于后台监听线程)
  • poll():立即返回,可能为 null(适合你不想阻塞、想自己控制节奏)

IV. 跨平台支持:StandardWatchEventKinds(标准事件是标准的,但实现不是😅)

StandardWatchEventKinds 提供的标准事件类型包括:

  • ENTRY_CREATE
  • ENTRY_DELETE
  • ENTRY_MODIFY
  • OVERFLOW(事件队列溢出,重要!)

4.1 跨平台现实:你得接受“不完全一致”

  • 有的平台对 MODIFY 触发很频繁(甚至属性变化也算)
  • 有的平台合并事件(你以为改了 10 次,它只给你 1 次)
  • 网络文件系统、挂载盘有时候支持得不咋地

所以策略上:

  • 关键逻辑不要只依赖事件(事件是“信号”,不是“真相”)
  • 真正执行同步时,最好再做一次文件存在性/大小/时间戳校验
  • 遇到 OVERFLOW,要做“全量扫描补偿”(后面示例会讲)

V. 异常处理:ClosedWatchServiceException(别粗暴吞异常,优雅停机才像样)

你关闭 WatchService 后,监听线程还在 take() 或继续处理,就可能抛:

  • ClosedWatchServiceException:服务被关闭(正常退出路径之一)

常见的“优雅停机”方式:

  • 主线程收到退出信号(比如 CTRL+C、系统 shutdown hook)
  • 调用 watchService.close()
  • 监听线程捕获 ClosedWatchServiceException 后退出循环

VI. 应用:实时文件同步系统(核心难点不是监听,是“稳”)🚀

说“实时同步”很容易,真正落地你会遇到几个特别烦但必须面对的问题:

  1. 递归目录监控WatchService 只监控你注册的目录,默认不递归
  2. 去抖:短时间 MODIFY 事件可能很多
  3. 文件写入未完成:你收到 CREATE 事件时文件可能还在写,直接同步可能拷到半截
  4. OVERFLOW 补偿:事件丢了要能修复
  5. 跨平台差异:行为不一致要靠策略兜住

下面我给你一个“能跑且接近实战”的同步骨架:

  • 监控源目录(递归注册所有子目录)
  • 事件发生后把文件加入“待同步队列”
  • 做简单去抖(同一路径短时间只处理一次)
  • 同步时复制到目标目录,保留目录结构
  • 处理 OVERFLOW:触发一次全量扫描同步(简化版)

示例:简化版实时文件同步器(递归监控 + 去抖 + 增量复制)

说明:这是一个“够实用”的基线版本,适合当你自己的工程起点。
你可以在此基础上继续加:校验 hash、删除同步、并发复制、失败重试、日志分级等。

1) 同步器代码:RealtimeFileSync.java

import java.io.IOException;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

import static java.nio.file.StandardCopyOption.COPY_ATTRIBUTES;
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;

public class RealtimeFileSync {

    private final Path sourceRoot;
    private final Path targetRoot;

    private final WatchService watchService;
    private final Map<WatchKey, Path> keyToDir = new ConcurrentHashMap<>();

    // 去抖:记录最近一次触发时间
    private final ConcurrentHashMap<Path, Long> lastEventTime = new ConcurrentHashMap<>();
    private final long debounceMillis = 300; // 300ms 内同一路径多次事件合并

    // 同步任务队列
    private final BlockingQueue<Path> syncQueue = new LinkedBlockingQueue<>();
    private final ExecutorService workerPool = Executors.newFixedThreadPool(2);

    private final AtomicBoolean running = new AtomicBoolean(true);

    public RealtimeFileSync(Path sourceRoot, Path targetRoot) throws IOException {
        this.sourceRoot = sourceRoot.toAbsolutePath().normalize();
        this.targetRoot = targetRoot.toAbsolutePath().normalize();
        this.watchService = FileSystems.getDefault().newWatchService();
    }

    public static void main(String[] args) throws Exception {
        if (args.length < 2) {
            System.out.println("""
                    用法:
                      java RealtimeFileSync <sourceDir> <targetDir>
                    示例:
                      java RealtimeFileSync /data/src /data/dst
                    """);
            return;
        }

        Path src = Path.of(args[0]);
        Path dst = Path.of(args[1]);

        if (!Files.isDirectory(src)) {
            throw new IllegalArgumentException("sourceDir 不是目录: " + src);
        }
        Files.createDirectories(dst);

        RealtimeFileSync app = new RealtimeFileSync(src, dst);

        // 优雅停机:CTRL+C 时关闭 WatchService
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            try {
                app.stop();
            } catch (IOException ignored) {}
        }));

        app.start();
    }

    public void start() throws IOException {
        System.out.println("启动同步器:");
        System.out.println("  source = " + sourceRoot);
        System.out.println("  target = " + targetRoot);
        System.out.println();

        registerAllDirs(sourceRoot);

        // 启动 worker:负责真正复制文件
        for (int i = 0; i < 2; i++) {
            workerPool.submit(this::syncWorker);
        }

        // 主监听循环
        try {
            while (running.get()) {
                WatchKey key = watchService.take(); // 阻塞等待
                Path dir = keyToDir.get(key);
                if (dir == null) {
                    key.reset();
                    continue;
                }

                boolean overflow = false;

                for (WatchEvent<?> event : key.pollEvents()) {
                    WatchEvent.Kind<?> kind = event.kind();

                    if (kind == StandardWatchEventKinds.OVERFLOW) {
                        overflow = true;
                        continue;
                    }

                    @SuppressWarnings("unchecked")
                    WatchEvent<Path> ev = (WatchEvent<Path>) event;
                    Path name = ev.context();          // 相对路径
                    Path fullPath = dir.resolve(name); // 绝对路径

                    // 目录创建:需要把新目录也注册进去(递归监控关键点)
                    if (kind == StandardWatchEventKinds.ENTRY_CREATE) {
                        try {
                            if (Files.isDirectory(fullPath)) {
                                registerAllDirs(fullPath);
                            }
                        } catch (IOException ignored) {}
                    }

                    // 文件/目录的同步触发:去抖
                    if (shouldEnqueue(fullPath)) {
                        syncQueue.offer(fullPath);
                    }
                }

                // 事件溢出:做一次全量扫描补偿(简化策略)
                if (overflow) {
                    System.err.println("发生 OVERFLOW:执行一次全量扫描补偿同步(简化版)");
                    fullScanEnqueueAllFiles();
                }

                boolean valid = key.reset();
                if (!valid) {
                    keyToDir.remove(key);
                    // 目录被删/不可访问等情况,继续运行即可
                }
            }
        } catch (ClosedWatchServiceException e) {
            // 正常退出路径
            System.out.println("WatchService 已关闭,监听线程退出。");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            workerPool.shutdownNow();
        }
    }

    public void stop() throws IOException {
        if (running.compareAndSet(true, false)) {
            watchService.close();
        }
    }

    private void syncWorker() {
        while (running.get()) {
            try {
                Path changed = syncQueue.poll(500, TimeUnit.MILLISECONDS);
                if (changed == null) continue;

                // 如果是目录变更:这里选择跳过(目录结构由 createDirectories 保证)
                if (Files.isDirectory(changed)) continue;

                // 只同步 sourceRoot 下的东西(防御一下)
                if (!changed.toAbsolutePath().normalize().startsWith(sourceRoot)) continue;

                // 等文件“稳定”(避免写入中拷贝半截)
                if (!waitUntilStable(changed, Duration.ofSeconds(2))) {
                    // 不稳定就再放回去稍后处理
                    syncQueue.offer(changed);
                    continue;
                }

                Path relative = sourceRoot.relativize(changed.toAbsolutePath().normalize());
                Path dest = targetRoot.resolve(relative);

                Files.createDirectories(dest.getParent());

                if (shouldSkipCopy(changed, dest)) {
                    // 轻量增量:无变化就跳过
                    continue;
                }

                Files.copy(changed, dest, REPLACE_EXISTING, COPY_ATTRIBUTES);
                System.out.println("同步: " + relative);

            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return;
            } catch (IOException e) {
                System.err.println("同步失败: " + e.getMessage());
            }
        }
    }

    private boolean shouldEnqueue(Path p) {
        long now = System.currentTimeMillis();
        Long prev = lastEventTime.put(p, now);
        return prev == null || (now - prev) > debounceMillis;
    }

    private boolean waitUntilStable(Path file, Duration maxWait) {
        try {
            long deadline = System.nanoTime() + maxWait.toNanos();
            long lastSize = -1;
            long lastModified = -1;

            while (System.nanoTime() < deadline) {
                if (!Files.exists(file)) return true; // 被删了,就当稳定(由后续逻辑决定是否处理)
                BasicFileAttributes a = Files.readAttributes(file, BasicFileAttributes.class);
                long size = a.size();
                long lm = a.lastModifiedTime().toMillis();

                if (size == lastSize && lm == lastModified) {
                    return true; // 连续两次一致,认为稳定
                }

                lastSize = size;
                lastModified = lm;
                Thread.sleep(120);
            }
        } catch (Exception ignored) {}
        return false;
    }

    private boolean shouldSkipCopy(Path src, Path dest) {
        try {
            if (!Files.exists(dest)) return false;
            BasicFileAttributes s = Files.readAttributes(src, BasicFileAttributes.class);
            BasicFileAttributes d = Files.readAttributes(dest, BasicFileAttributes.class);
            return s.size() == d.size() && s.lastModifiedTime().equals(d.lastModifiedTime());
        } catch (IOException e) {
            return false;
        }
    }

    private void registerAllDirs(Path start) throws IOException {
        Files.walkFileTree(start, new SimpleFileVisitor<>() {
            @Override
            public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
                registerDir(dir);
                return FileVisitResult.CONTINUE;
            }
        });
    }

    private void registerDir(Path dir) throws IOException {
        // 注意:只注册标准 kinds(跨平台更稳)
        WatchKey key = dir.register(watchService,
                StandardWatchEventKinds.ENTRY_CREATE,
                StandardWatchEventKinds.ENTRY_DELETE,
                StandardWatchEventKinds.ENTRY_MODIFY);

        keyToDir.put(key, dir);
    }

    private void fullScanEnqueueAllFiles() {
        try {
            Files.walkFileTree(sourceRoot, new SimpleFileVisitor<>() {
                @Override
                public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
                    if (attrs.isRegularFile() && shouldEnqueue(file)) {
                        syncQueue.offer(file);
                    }
                    return FileVisitResult.CONTINUE;
                }
            });
        } catch (IOException e) {
            System.err.println("全量扫描失败: " + e.getMessage());
        }
    }
}

这个同步器有哪些“现实主义”设计?(不讲虚的)

我刻意塞了几个真实项目里很关键的点:

  1. 递归注册目录
    WatchService 不会自动监控子目录,所以必须在启动时 walkFileTree 注册一次;新目录创建时还要补注册。

  2. 去抖 debounce
    文件保存可能触发多次 MODIFY,不去抖你会重复复制,日志还能把你淹了。

  3. 文件稳定性等待
    收到事件时文件可能仍在写入,直接复制容易拿到半截文件。这里用“大小+修改时间连续两次一致”做一个廉价但管用的判断。

  4. OVERFLOW 补偿扫描
    事件队列溢出意味着你丢了事件。此时最稳的策略就是:触发一次“全量扫描同步”去补漏(这是很多同步系统的保底逻辑)。

  5. 优雅停机
    ClosedWatchServiceException 是正常退出路径之一,不要当成“异常报警”。能优雅退出的程序才像能上线的程序。🙂

你可能马上会踩的坑(我提前替你骂两句)😅

  • “我监控了一个目录,为什么子目录不触发?”
    因为它就不递归,你得自己注册子目录。

  • “我只改了一次文件,为什么收到一堆 MODIFY?”
    编辑器/系统行为不同,事件会合并/重复/拆分。去抖 + 稳定性判断是常规操作。

  • “为啥同步拿到的文件是空的/半截?”
    你拷贝太快了,人家还在写。等它稳定再拷。

  • “Windows 上行为跟 Linux 不一样?”
    正常。别追求“事件完全一致”,追求“最终状态一致”。

📝 写在最后

如果你觉得这篇文章对你有帮助,或者有任何想法、建议,欢迎在评论区留言交流!你的每一个点赞 👍、收藏 ⭐、关注 ❤️,都是我持续更新的最大动力!

我是一个在代码世界里不断摸索的小码农,愿我们都能在成长的路上越走越远,越学越强!

感谢你的阅读,我们下篇文章再见~👋

✍️ 作者:某个被流“治愈”过的 Java 老兵
📅 日期:2026-01-07
🧵 本文原创,转载请注明出处。

Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐