Java IO - 源码: OutputStream

本文主要从JDK 11源码角度分析 OutputStream。 @pdai

OutputStream 类实现关系

OutputStream是输出字节流,具体的实现类层次结构如下:

OutputStream 抽象类

OutputStream 类重要方法设计如下:

// 写入一个字节,可以看到这里的参数是一个 int 类型,对应上面的读方法,int 类型的 32 位,只有低 8 位才写入,高 24 位将舍弃。
public abstract void write(int b)

// 将数组中的所有字节写入,实际调用的是write(byte b[], int off, int len)方法。
public void write(byte b[])

// 将 byte 数组从 off 位置开始,len 长度的字节写入
public void write(byte b[], int off, int len)

// 强制刷新,将缓冲中的数据写入; 默认是空实现,供子类覆盖
public void flush()

// 关闭输出流,流被关闭后就不能再输出数据了; 默认是空实现,供子类覆盖
public void close()

源码实现

梳理部分OutputStream及其实现类的源码分析。

OutputStream

OutputStream抽象类源码如下:

public abstract class OutputStream implements Closeable, Flushable {
    
    // JDK11中增加了一个nullOutputStream,即空模式实现,以便可以直接调用而不用判空(可以看如下的补充说明)
    public static OutputStream nullOutputStream() {
        return new OutputStream() {
            private volatile boolean closed;

            private void ensureOpen() throws IOException {
                if (closed) {
                    throw new IOException("Stream closed");
                }
            }

            @Override
            public void write(int b) throws IOException {
                ensureOpen();
            }

            @Override
            public void write(byte b[], int off, int len) throws IOException {
                Objects.checkFromIndexSize(off, len, b.length);
                ensureOpen();
            }

            @Override
            public void close() {
                closed = true;
            }
        };
    }

    // 写入一个字节,可以看到这里的参数是一个 int 类型,对应上面的读方法,int 类型的 32 位,只有低 8 位才写入,高 24 位将舍弃。
    public abstract void write(int b) throws IOException;

    // 将数组中的所有字节写入,实际调用的是write(byte b[], int off, int len)方法
    public void write(byte b[]) throws IOException {
        write(b, 0, b.length);
    }

    // 将 byte 数组从 off 位置开始,len 长度的字节写入
    public void write(byte b[], int off, int len) throws IOException {
        // 检查边界合理性
        Objects.checkFromIndexSize(off, len, b.length);
        // len == 0 的情况已经在如下的for循环中隐式处理了
        for (int i = 0 ; i < len ; i++) {
            write(b[off + i]);
        }
    }

    // 强制刷新,将缓冲中的数据写入; 默认是空实现,供子类覆盖
    public void flush() throws IOException {
    }

    // 关闭输出流,流被关闭后就不能再输出数据了; 默认是空实现,供子类覆盖
    public void close() throws IOException {
    }

}

补充下JDK11为什么会增加nullOutputStream方法的设计?即空对象模式

  • 空对象模式

举个例子:

public class MyParser implements Parser {
  private static Action NO_ACTION = new Action() {
    public void doSomething() { /* do nothing */ }
  };

  public Action findAction(String userInput) {
    // ...
    if ( /* we can't find any actions */ ) {
      return NO_ACTION;
    }
  }
}

然后便可以始终可以这么调用,而不用再判断空了

ParserFactory.getParser().findAction(someInput).doSomething();

FilterOutputStream

FilterOutputStream 源码如下

public class FilterOutputStream extends OutputStream {
    
    // 被装饰的实际outputStream
    protected OutputStream out;

    // 当前stream是否已经被close
    private volatile boolean closed;

    // close stream时加锁,防止其它线程同时close
    private final Object closeLock = new Object();

    // 初始化构造函数,传入被装饰的实际outputStream
    public FilterOutputStream(OutputStream out) {
        this.out = out;
    }

    // 写入数据,本质调用被装饰outputStream的方法
    @Override
    public void write(int b) throws IOException {
        out.write(b);
    }

    // 将数组中的所有字节写入
    @Override
    public void write(byte b[]) throws IOException {
        write(b, 0, b.length);
    }

    // 一个个写入
    @Override
    public void write(byte b[], int off, int len) throws IOException {
        if ((off | len | (b.length - (len + off)) | (off + len)) < 0)
            throw new IndexOutOfBoundsException();

        for (int i = 0 ; i < len ; i++) {
            write(b[off + i]);
        }
    }

     // 强制刷新,将缓冲中的数据写入; 本质调用被装饰outputStream的方法
    @Override
    public void flush() throws IOException {
        out.flush();
    }

    // 关闭Stream
    @Override
    public void close() throws IOException {
        // 如果已经close, 直接退出
        if (closed) {
            return;
        }
        // 加锁处理,如果已经有线程正在closing则退出;
        synchronized (closeLock) {
            if (closed) {
                return;
            }
            closed = true;
        }

        // close前调用flush
        Throwable flushException = null;
        try {
            flush();
        } catch (Throwable e) {
            flushException = e;
            throw e;
        } finally {
            if (flushException == null) {
                out.close();
            } else {
                try {
                    out.close();
                } catch (Throwable closeException) {
                   // evaluate possible precedence of flushException over closeException
                   if ((flushException instanceof ThreadDeath) &&
                       !(closeException instanceof ThreadDeath)) {
                       flushException.addSuppressed(closeException);
                       throw (ThreadDeath) flushException;
                   }

                    if (flushException != closeException) {
                        closeException.addSuppressed(flushException);
                    }

                    throw closeException;
                }
            }
        }
    }
}

@pdai: 对比下JDK8中,close方法是没有加锁处理的。这种情况下你可以看JDK8源码中,直接利用java7的try with resources方式,优雅的调用flush方法后对out进行关闭。

public void close() throws IOException {
    try (OutputStream ostream = out) {
        flush();
    }
}

ByteArrayOutputStream

ByteArrayOutputStream 源码如下

public class ByteArrayOutputStream extends OutputStream {

    // 实际的byte数组
    protected byte buf[];

    // 数组中实际有效的byte的个数
    protected int count;

    // 初始化默认构造,初始化byte数组大小为32
    public ByteArrayOutputStream() {
        this(32);
    }

    // 初始化byte的大小
    public ByteArrayOutputStream(int size) {
        if (size < 0) {
            throw new IllegalArgumentException("Negative initial size: "
                                               + size);
        }
        buf = new byte[size];
    }

    // 扩容,确保它至少可以容纳由最小容量参数指定的元素数
    private void ensureCapacity(int minCapacity) {
        // overflow-conscious code
        if (minCapacity - buf.length > 0)
            grow(minCapacity);
    }

    // 分配的最大数组大小。
    // 由于一些VM在数组中保留一些头字,所以尝试分配较大的阵列可能会导致OutOfMemoryError(请求的阵列大小超过VM限制)
    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

    // 扩容的实质方法,确保它至少可以容纳由最小容量参数指定的元素数
    private void grow(int minCapacity) {
        // overflow-conscious code
        int oldCapacity = buf.length;
        int newCapacity = oldCapacity << 1;
        if (newCapacity - minCapacity < 0)
            newCapacity = minCapacity;
        if (newCapacity - MAX_ARRAY_SIZE > 0)
            newCapacity = hugeCapacity(minCapacity);
        buf = Arrays.copyOf(buf, newCapacity);
    }

    private static int hugeCapacity(int minCapacity) {
        if (minCapacity < 0) // overflow
            throw new OutOfMemoryError();
        return (minCapacity > MAX_ARRAY_SIZE) ?
            Integer.MAX_VALUE :
            MAX_ARRAY_SIZE;
    }

    // 写入,写入前确保byte数据长度
    public synchronized void write(int b) {
        ensureCapacity(count + 1);
        buf[count] = (byte) b;
        count += 1;
    }

    
    public synchronized void write(byte b[], int off, int len) {
        Objects.checkFromIndexSize(off, len, b.length);
        ensureCapacity(count + len);
        System.arraycopy(b, off, buf, count, len);
        count += len;
    }

    public void writeBytes(byte b[]) {
        write(b, 0, b.length);
    }

    public synchronized void writeTo(OutputStream out) throws IOException {
        out.write(buf, 0, count);
    }

    // 重置,显然将实际有效的byte数量置为0
    public synchronized void reset() {
        count = 0;
    }

    
    public synchronized byte[] toByteArray() {
        return Arrays.copyOf(buf, count);
    }

    // 长度,即count
    public synchronized int size() {
        return count;
    }

    // 转成string
    public synchronized String toString() {
        return new String(buf, 0, count);
    }

    // 转成string,指定的字符集
    public synchronized String toString(String charsetName)
        throws UnsupportedEncodingException
    {
        return new String(buf, 0, count, charsetName);
    }

    public synchronized String toString(Charset charset) {
        return new String(buf, 0, count, charset);
    }

    // 弃用
    @Deprecated
    public synchronized String toString(int hibyte) {
        return new String(buf, hibyte, 0, count);
    }

    // 对byte 数组而言,close没啥实质意义,所以空实现
    public void close() throws IOException {
    }

}

BufferedOutputStream

BufferedOutputStream 源码如下

public class BufferedOutputStream extends FilterOutputStream {
    
    // Buffered outputStream底层也是byte数组
    protected byte buf[];

    // 大小,buf[0]到buf[count-1]是实际存储的bytes
    protected int count;

    // 构造函数,被装饰的outputStream,以及默认buf大小是8192
    public BufferedOutputStream(OutputStream out) {
        this(out, 8192);
    }

    public BufferedOutputStream(OutputStream out, int size) {
        super(out);
        if (size <= 0) {
            throw new IllegalArgumentException("Buffer size <= 0");
        }
        buf = new byte[size];
    }

    /** Flush the internal buffer */
    // 内部的flush方法,将buffer中的有效bytes(count是有效的bytes大小)通过被装饰的outputStream写入
    private void flushBuffer() throws IOException {
        if (count > 0) {
            out.write(buf, 0, count);
            count = 0;
        }
    }

    // 写入byte
    @Override
    public synchronized void write(int b) throws IOException {
        // 当buffer满了以后,flush buffer
        if (count >= buf.length) {
            flushBuffer();
        }
        buf[count++] = (byte)b;
    }

    // 将 byte 数组从 off 位置开始,len 长度的字节写入
    @Override
    public synchronized void write(byte b[], int off, int len) throws IOException {
        if (len >= buf.length) {
            // 如果请求长度已经超过输出缓冲区的大小,直接刷新输出缓冲区,然后直接写入数据。
            flushBuffer();
            out.write(b, off, len);
            return;
        }
        if (len > buf.length - count) {
            flushBuffer();
        }
        System.arraycopy(b, off, buf, count, len);
        count += len;
    }

    // flush方法,需要先将buffer中写入,最后在调用被装饰outputStream的flush方法
    @Override
    public synchronized void flush() throws IOException {
        flushBuffer();
        out.flush();
    }
}

参考文章

  • JDK 11