Java NIO零拷贝

在 Java NIO 中的通道(Channel)就相当于操作系统的内核空间(kernel space)的缓冲区,而缓冲区(Buffer)对应的相当于操作系统的用户空间(user space)中的用户缓冲区(user buffer)。

  • 通道(Channel)是全双工的(双向传输),它既可能是读缓冲区(read buffer),也可能是网络缓冲区(socket buffer)。
  • 缓冲区(Buffer)分为堆内存(HeapBuffer)和堆外内存(DirectBuffer),这是通过 malloc() 分配出来的用户态内存。

堆外内存(DirectBuffer)在使用后需要应用程序手动回收,而堆内存(HeapBuffer)的数据在 GC 时可能会被自动回收。因此,在使用 HeapBuffer 读写数据时,为了避免缓冲区数据因为 GC 而丢失,NIO 会先把 HeapBuffer 内部的数据拷贝到一个临时的 DirectBuffer 中的本地内存(native memory),这个拷贝涉及到 sun.misc.Unsafe.copyMemory() 的调用,背后的实现原理与 memcpy() 类似。 最后,将临时生成的 DirectBuffer 内部的数据的内存地址传给 I/O 调用函数,这样就避免了再去访问 Java 对象处理 I/O 读写。

内存映射文件

内存映射文件 I/O 是一种读和写文件数据的方法,它可以比常规的基于流或者基于通道的 I/O 快得多。

向内存映射文件写入可能是危险的,只是改变数组的单个元素这样的简单操作,就可能会直接修改磁盘上的文件。修改数据与将数据保存到磁盘是没有分开的。

下面代码行将文件的前 1024 个字节映射到内存中,map() 方法返回一个 MappedByteBuffer,它是 ByteBuffer 的子类。因此,可以像使用其他任何 ByteBuffer 一样使用新映射的缓冲区,操作系统会在需要时负责执行映射。

java

MappedByteBuffer mbb = fc.map(FileChannel.MapMode.READ_WRITE, 0, 1024);

MappedByteBuffer

MappedByteBuffer 是 NIO 基于内存映射(mmap这种零拷贝方式的提供的一种实现,可以减少一次数据拷贝的过程。它继承自 ByteBuffer。FileChannel 定义了一个 map() 方法,它可以把一个文件从 position 位置开始的 size 大小的区域映射为内存映像文件。抽象方法 map() 方法在 FileChannel 中的定义如下:

java

public abstract MappedByteBuffer map(MapMode mode, long position, long size)
        throws IOException;
  • mode:限定内存映射区域(MappedByteBuffer)对内存映像文件的访问模式,包括只可读(READ_ONLY)、可读可写(READ_WRITE)和写时拷贝(PRIVATE)三种模式。
  • position:文件映射的起始地址,对应内存映射区域(MappedByteBuffer)的首地址。
  • size:文件映射的字节长度,从 position 往后的字节数,对应内存映射区域(MappedByteBuffer)的大小。

MappedByteBuffer 相比 ByteBuffer 新增了 fore()、load() 和 isLoad() 三个重要的方法:

  • fore():对于处于 READ_WRITE 模式下的缓冲区,把对缓冲区内容的修改强制刷新到本地文件。
  • load():将缓冲区的内容载入物理内存中,并返回这个缓冲区的引用。
  • isLoaded():如果缓冲区的内容在物理内存中,则返回 true,否则返回 false。

下面给出一个利用 MappedByteBuffer 对文件进行读写的使用示例:

java

private final static String CONTENT = "Zero copy implemented by MappedByteBuffer";
private final static String FILE_NAME = "/mmap.txt";
private final static String CHARSET = "UTF-8";
  • 写文件数据:打开文件通道 fileChannel 并提供读权限、写权限和数据清空权限,通过 fileChannel 映射到一个可写的内存缓冲区 mappedByteBuffer,将目标数据写入 mappedByteBuffer,通过 force() 方法把缓冲区更改的内容强制写入本地文件。

java

@Test
public void writeToFileByMappedByteBuffer() {
    Path path = Paths.get(getClass().getResource(FILE_NAME).getPath());
    byte[] bytes = CONTENT.getBytes(Charset.forName(CHARSET));
    try (FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.READ,
            StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING)) {
        MappedByteBuffer mappedByteBuffer = fileChannel.map(READ_WRITE, 0, bytes.length);
        if (mappedByteBuffer != null) {
            mappedByteBuffer.put(bytes);
            mappedByteBuffer.force();
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
}
  • 读文件数据:打开文件通道 fileChannel 并提供只读权限,通过 fileChannel 映射到一个只可读的内存缓冲区 mappedByteBuffer,读取 mappedByteBuffer 中的字节数组即可得到文件数据。

java

@Test
public void readFromFileByMappedByteBuffer() {
    Path path = Paths.get(getClass().getResource(FILE_NAME).getPath());
    int length = CONTENT.getBytes(Charset.forName(CHARSET)).length;
    try (FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.READ)) {
        MappedByteBuffer mappedByteBuffer = fileChannel.map(READ_ONLY, 0, length);
        if (mappedByteBuffer != null) {
            byte[] bytes = new byte[length];
            mappedByteBuffer.get(bytes);
            String content = new String(bytes, StandardCharsets.UTF_8);
            assertEquals(content, "Zero copy implemented by MappedByteBuffer");
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
}

下面介绍 map() 方法的底层实现原理map() 方法是 java.nio.channels.FileChannel 的抽象方法,由子类 sun.nio.ch.FileChannelImpl.java 实现,下面是和内存映射相关的核心代码:

java

public MappedByteBuffer map(MapMode mode, long position, long size) throws IOException {
    int pagePosition = (int)(position % allocationGranularity);
    long mapPosition = position - pagePosition;
    long mapSize = size + pagePosition;
    try {
        addr = map0(imode, mapPosition, mapSize);
    } catch (OutOfMemoryError x) {
        System.gc();
        try {
            Thread.sleep(100);
        } catch (InterruptedException y) {
            Thread.currentThread().interrupt();
        }
        try {
            addr = map0(imode, mapPosition, mapSize);
        } catch (OutOfMemoryError y) {
            throw new IOException("Map failed", y);
        }
    }

    int isize = (int)size;
    Unmapper um = new Unmapper(addr, mapSize, isize, mfd);
    if ((!writable) || (imode == MAP_RO)) {
        return Util.newMappedByteBufferR(isize, addr + pagePosition, mfd, um);
    } else {
        return Util.newMappedByteBuffer(isize, addr + pagePosition, mfd, um);
    }
}

map() 方法通过本地方法 map0() 为文件分配一块虚拟内存,作为它的内存映射区域,然后返回这块内存映射区域的起始地址。

  • 文件映射需要在 Java 堆中创建一个 MappedByteBuffer 的实例。如果第一次文件映射导致 OOM,则手动触发垃圾回收,休眠 100ms 后再尝试映射,如果失败则抛出异常。
  • 通过 Util 的 newMappedByteBuffer (可读可写)方法或者 newMappedByteBufferR(仅读) 方法方法反射创建一个 DirectByteBuffer 实例,其中 DirectByteBuffer 是 MappedByteBuffer 的子类。

map() 方法返回的是内存映射区域的起始地址,通过(起始地址 + 偏移量)就可以获取指定内存的数据。这样一定程度上替代了 read() 或 write() 方法,底层直接采用 sun.misc.Unsafe类的 getByte() 和 putByte() 方法对数据进行读写。

java

private native long map0(int prot, long position, long mapSize) throws IOException;

上面是本地方法(native method)map0 的定义,它通过 JNI(Java Native Interface)调用底层 C 的实现,这个 native 函数(Java_sun_nio_ch_FileChannelImpl_map0)的实现位于 JDK 源码包下的 native/sun/nio/ch/FileChannelImpl.c这个源文件里面。

c

JNIEXPORT jlong JNICALL
Java_sun_nio_ch_FileChannelImpl_map0(JNIEnv *env, jobject this,
                                     jint prot, jlong off, jlong len)
{
    void *mapAddress = 0;
    jobject fdo = (*env)->GetObjectField(env, this, chan_fd);
    jint fd = fdval(env, fdo);
    int protections = 0;
    int flags = 0;

    if (prot == sun_nio_ch_FileChannelImpl_MAP_RO) {
        protections = PROT_READ;
        flags = MAP_SHARED;
    } else if (prot == sun_nio_ch_FileChannelImpl_MAP_RW) {
        protections = PROT_WRITE | PROT_READ;
        flags = MAP_SHARED;
    } else if (prot == sun_nio_ch_FileChannelImpl_MAP_PV) {
        protections =  PROT_WRITE | PROT_READ;
        flags = MAP_PRIVATE;
    }

    mapAddress = mmap64(
        0,                    /* Let OS decide location */
        len,                  /* Number of bytes to map */
        protections,          /* File permissions */
        flags,                /* Changes are shared */
        fd,                   /* File descriptor of mapped file */
        off);                 /* Offset into file */

    if (mapAddress == MAP_FAILED) {
        if (errno == ENOMEM) {
            JNU_ThrowOutOfMemoryError(env, "Map failed");
            return IOS_THROWN;
        }
        return handle(env, -1, "Map failed");
    }

    return ((jlong) (unsigned long) mapAddress);
}

可以看出 map0() 函数最终是通过 mmap64() 这个函数对 Linux 底层内核发出内存映射的调用, mmap64() 函数的原型如下:

c

#include <sys/mman.h>

void *mmap64(void *addr, size_t len, int prot, int flags, int fd, off64_t offset);

下面详细介绍一下 mmap64() 函数各个参数的含义以及参数可选值:

  • addr:文件在用户进程空间的内存映射区中的起始地址,是一个建议的参数,通常可设置为 0 或 NULL,此时由内核去决定真实的起始地址。当 + flags 为 MAP_FIXED 时,addr 就是一个必选的参数,即需要提供一个存在的地址。
  • len:文件需要进行内存映射的字节长度
  • prot :控制用户进程对内存映射区的访问权限
    • PROT_READ:读权限
    • PROT_WRITE:写权限
    • PROT_EXEC:执行权限
    • PROT_NONE:无权限
  • flags:控制内存映射区的修改是否被多个进程共享
    • MAP_PRIVATE:对内存映射区数据的修改不会反映到真正的文件,数据修改发生时采用写时复制机制
    • MAP_SHARED:对内存映射区的修改会同步到真正的文件,修改对共享此内存映射区的进程是可见的
    • MAP_FIXED:不建议使用,这种模式下 addr 参数指定的必须的提供一个存在的 addr 参数
  • fd:文件描述符。每次 map 操作会导致文件的引用计数加 1,每次 unmap 操作或者结束进程会导致引用计数减 1
  • offset:文件偏移量。进行映射的文件位置,从文件起始地址向后的位移量

下面总结一下 MappedByteBuffer 的特点和不足之处:

  • MappedByteBuffer 使用是堆外的虚拟内存,因此分配(map)的内存大小不受 JVM 的 -Xmx 参数限制,但是也是有大小限制的。 如果当文件超出 Integer.MAX_VALUE 字节限制时,可以通过 position 参数重新 map 文件后面的内容。
  • MappedByteBuffer 在处理大文件时性能的确很高,但也存内存占用、文件关闭不确定等问题,被其打开的文件只有在垃圾回收的才会被关闭,而且这个时间点是不确定的。
  • MappedByteBuffer 提供了文件映射内存的 mmap() 方法,也提供了释放映射内存的 unmap() 方法。然而 unmap() 是 FileChannelImpl 中的私有方法,无法直接显示调用。因此,用户程序需要通过 Java 反射的调用 sun.misc.Cleaner 类的 clean() 方法手动释放映射占用的内存区域

java

public static void clean(final Object buffer) throws Exception {
    AccessController.doPrivileged((PrivilegedAction<Void>) () -> {
        try {
            Method getCleanerMethod = buffer.getClass().getMethod("cleaner", new Class[0]);
            getCleanerMethod.setAccessible(true);
            Cleaner cleaner = (Cleaner) getCleanerMethod.invoke(buffer, new Object[0]);
            cleaner.clean();
        } catch(Exception e) {
            e.printStackTrace();
        }
    });
}

DirectByteBuffer

DirectByteBuffer 的对象引用位于 Java 内存模型的堆里面,JVM 可以对 DirectByteBuffer 的对象进行内存分配和回收管理,一般使用 DirectByteBuffer 的静态方法 allocateDirect() 创建 DirectByteBuffer 实例并分配内存。

java

public static ByteBuffer allocateDirect(int capacity) {
    return new DirectByteBuffer(capacity);
}

DirectByteBuffer 内部的字节缓冲区位在于堆外的(用户态)直接内存,它是通过 Unsafe 的本地方法 allocateMemory() 进行内存分配,底层调用的是操作系统的 malloc() 函数,因此DirectByteBuffer 使用的是操作系统内存。

java

DirectByteBuffer(int cap) {
    super(-1, 0, cap, cap);
    boolean pa = VM.isDirectMemoryPageAligned();
    int ps = Bits.pageSize();
    long size = Math.max(1L, (long)cap + (pa ? ps : 0));
    Bits.reserveMemory(size, cap);

    long base = 0;
    try {
        base = unsafe.allocateMemory(size);
    } catch (OutOfMemoryError x) {
        Bits.unreserveMemory(size, cap);
        throw x;
    }
    unsafe.setMemory(base, size, (byte) 0);
    if (pa && (base % ps != 0)) {
        address = base + ps - (base & (ps - 1));
    } else {
        address = base;
    }
    cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
    att = null;
}

使用 DirectByteBuf 将堆外内存映射到 jvm 内存中来直接访问使用

  • 这块内存不受 jvm 垃圾回收的影响,因此内存地址固定,有助于 IO 读写
  • java 中的 DirectByteBuf 对象仅维护了此内存的虚引用,内存回收分成两步
    • DirectByteBuf 对象被垃圾回收,将虚引用加入引用队列
    • 通过专门线程访问引用队列,根据虚引用释放堆外内存
  • 减少了一次数据拷贝,用户态与内核态的切换次数没有减少

除此之外,初始化 DirectByteBuffer 时还会创建一个 Deallocator 线程,并通过 Cleaner 的 freeMemory() 方法来对直接内存进行回收操作,freeMemory() 底层调用的是操作系统的 free() 函数。

java

private static class Deallocator implements Runnable {
    private static Unsafe unsafe = Unsafe.getUnsafe();

    private long address;
    private long size;
    private int capacity;

    private Deallocator(long address, long size, int capacity) {
        assert (address != 0);
        this.address = address;
        this.size = size;
        this.capacity = capacity;
    }

    public void run() {
        if (address == 0) {
            return;
        }
        unsafe.freeMemory(address);
        address = 0;
        Bits.unreserveMemory(size, capacity);
    }
}

由于使用 DirectByteBuffer 分配的是系统本地的内存,不在 JVM 的管控范围之内,因此直接内存的回收和堆内存的回收不同,直接内存如果使用不当,很容易造成 OutOfMemoryError。

说了这么多,那么 DirectByteBuffer 和零拷贝有什么关系?前面有提到在 MappedByteBuffer 进行内存映射时,它的 map() 方法会通过 Util.newMappedByteBuffer() 来创建一个缓冲区实例,初始化的代码如下:

java

static MappedByteBuffer newMappedByteBuffer(int size, long addr, FileDescriptor fd,
                                            Runnable unmapper) {
    MappedByteBuffer dbb;
    if (directByteBufferConstructor == null)
        initDBBConstructor();
    try {
        dbb = (MappedByteBuffer)directByteBufferConstructor.newInstance(
            new Object[] { new Integer(size), new Long(addr), fd, unmapper });
    } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
        throw new InternalError(e);
    }
    return dbb;
}

private static void initDBBRConstructor() {
Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐