摘要:本文主要分析Okio的实现原理。


文章原创,允许转载,转载请注明出处。


一、概述

Okio最初是作为Okhttp的一个组件被开发出来的,用于操作I/O流,其主要有以下几个特点:

  • 内嵌Timeout机制,适用于read()及write() ;
  • 采用Source、Sink取代InputStream、OutputStream作为输入输出流类型,简化API设计;
  • BufferedSource及BufferedSink定义了几乎所有常用流操作接口,并且无需考虑字节或字符操作的差异性(java中操作字符需将Stream包装成Reader),易于使用;
  • Okio中提供了将文件、socket及其他java流等转换成Source或Sink的API,方便在传统I/O流与Okio流之间切换。

正是这些优势,让Okio库不仅很好的配合OkHttpClient,并且可以作为独立库用于简化I/O操作。

二、ByteString

ByteString是基于字节数组实现的一个String变种,在底层数组不被外界破坏的情况下其实现是不可变的,类似于String。相对于String,ByteString是在字节处理上更有优势,提供了hex、base64、md5及sha256等相关实用接口,同时通过缓存utf-8字符串有效提高其utf-8编解码的效率,更适用于网络传输。

三、Segment

Segment是Okio中的缓存数据结构,大小为2048字节,实现类似于nio中的ByteBuffer,采用pos、limit来操作数据。Okio中的Buffer的底层存储结构便是由Segment形成的循环双链表,从而实现更快的数据传输及复制;同时,通过设计SegmentPool来缓存一定量的空闲Segment,从而来提高Segment的分配及使用效率,SegmentPool以由Segment形成的单链表作为底层存储结构。
Segment具有两个属性shared及owner:

  • shared : 表明该Segment的底层字节数组被多个Segment共用,这个属性限制该Segment不能移动或变更已有数据,从而确保数据的一致性,并且该Segment不可被SegmentPool回收;
  • owner : 表明底层字节数组由该Segment创建,因此该Segment拥有扩展limit的权利,即该Segment可以扩展底层字节数组的数据;

Segment主要包含以下操作:

1. pop()

从双链表中移除该Segment,并返回该Segment的下一个Segment;

2. push(Segement segment)

将segment插入当前Segment之后,并返回插入的segment;

3. split(int byteCount)

在该Segment之前新建并插入一个新的Segment,并设置底层字节数组为共享模式,通过设定两个Segment的pos及limit调整各自的可用数据范围,实现Segment的分割:

public Segment split(int byteCount) {
    if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();
    Segment prefix = new Segment(this);
    prefix.limit = prefix.pos + byteCount;
    pos += byteCount;
    prev.push(prefix);
    return prefix;
}
4. compact()

把该Segment的数据转移至其前一个Segment,然后移除该Segment,实现Segment的压缩:

public void compact() {
    if (prev == this) throw new IllegalStateException();
    if (!prev.owner) return; // Cannot compact: prev isn't writable.
    int byteCount = limit - pos;
    int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);
    if (byteCount > availableByteCount) return; // Cannot compact: not enough writable space.
    writeTo(prev, byteCount);
    pop();
    SegmentPool.recycle(this);
}
5. writeTo(Segment sink, int byteCount)

将该Segment中byteCount个字节传输至sink中:

public void writeTo(Segment sink, int byteCount) {
    if (!sink.owner) throw new IllegalArgumentException();
    if (sink.limit + byteCount > SIZE) {
      // We can't fit byteCount bytes at the sink's current position. Shift sink first.
      if (sink.shared) throw new IllegalArgumentException();
      if (sink.limit + byteCount - sink.pos > SIZE) throw new IllegalArgumentException();
      System.arraycopy(sink.data, sink.pos, sink.data, 0, sink.limit - sink.pos);
      sink.limit -= sink.pos;
      sink.pos = 0;
    }

    System.arraycopy(data, pos, sink.data, sink.limit, byteCount);
    sink.limit += byteCount;
    pos += byteCount;
}

四、Timeout

1. 基本Timeout

Timeout通过以下属性来设定某操作的限时:

// 是否有截止时间
private boolean hasDeadline;
// 截止时间,是绝对时间,当到达该时间点时操作超时
private long deadlineNanoTime;
// 超时时间,是相对时间,当到达(当前时间+超时时间)时操作超时
private long timeoutNanos;

而Timeout是通过调用以下函数抛出超时异常:

public void throwIfReached() throws IOException {
    if (Thread.interrupted()) {
      throw new InterruptedIOException("thread interrupted");
    }
    if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) {
      throw new InterruptedIOException("deadline reached");
    }
}
2. 异步AsyncTimeout

异步超时器的实现原理如下:

  • 将所有AsyncTimeout按超时时间点升序链接成超时器单链表;
  • 启动一个WatchDog线程,采用wait/notify的模式在相应时间点移除单链表中的AsyncTimeout并调用其timeout();

由于超时链表中的所有超时器共用一个看门狗定时器,因此在使用异步超时器时需特别注意:不可在timeout()中做耗时操作,否则可能会导致链表中其他超时器不准确。
AsyncTimeout通过enter()将自己插入超时器单链表中,然后在操作完成之后调用exit()来判定是否超时:

public final void enter() {
	// 省略无关代码
    scheduleTimeout(this, timeoutNanos, hasDeadline);
}
private static synchronized void scheduleTimeout(
      AsyncTimeout node, long timeoutNanos, boolean hasDeadline) {
    // 启动看门狗定时器
    if (head == null) {
      head = new AsyncTimeout();
      new Watchdog().start();
    }

    // 将AsyncTimeout插入超时链表合适位置
    long remainingNanos = node.remainingNanos(now);
    for (AsyncTimeout prev = head; true; prev = prev.next) {
      if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) {
        node.next = prev.next;
        prev.next = node;
        if (prev == head) {
          // 当前插入AsyncTimeout的超时时间点最近,因此之前看门狗中定时时间可能失效,需唤醒重新计算
          AsyncTimeout.class.notify();
        }
        break;
      }
    }
}

public final boolean exit() {
   return cancelScheduledTimeout(this);
}
private static synchronized boolean cancelScheduledTimeout(AsyncTimeout node) {
    // 移除相应超时器
    for (AsyncTimeout prev = head; prev != null; prev = prev.next) {
      if (prev.next == node) {
        prev.next = node.next;
        node.next = null;
        return false;
      }
    }

    // 若该超时器不存在于链表中,说明已经超时
    return true;
  }

五、Buffer

前文中已经提及:Buffer的底层数据存储结构是由Segment组成的循环双链表,数据的复制和传输通过操作Segment的归属及limit、pos来实现,因此非常高效。另一方面,Buffer实现了BufferedSource及BufferedSink中的相关接口,因此实现了绝大部分常用字节及字符操作接口。
首先,先感受下Source及Sink中的基本接口的实现:

@Override 
public long read(Buffer sink, long byteCount) {
    if (sink == null) throw new IllegalArgumentException("sink == null");
    if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
    if (size == 0) return -1L;
    if (byteCount > size) byteCount = size;
    sink.write(this, byteCount);
    return byteCount;
}

@Override 
public void write(Buffer source, long byteCount) {
    if (source == null) throw new IllegalArgumentException("source == null");
    if (source == this) throw new IllegalArgumentException("source == this");
    checkOffsetAndCount(source.size, 0, byteCount);

    while (byteCount > 0) {
      // Is a prefix of the source's head segment all that we need to move?
      if (byteCount < (source.head.limit - source.head.pos)) {
        Segment tail = head != null ? head.prev : null;
        if (tail != null && tail.owner
            && (byteCount + tail.limit - (tail.shared ? 0 : tail.pos) <= Segment.SIZE)) {
          // Our existing segments are sufficient. Move bytes from source's head to our tail.
          source.head.writeTo(tail, (int) byteCount);
          source.size -= byteCount;
          size += byteCount;
          return;
        } else {
          // We're going to need another segment. Split the source's head
          // segment in two, then move the first of those two to this buffer.
          source.head = source.head.split((int) byteCount);
        }
      }

      // Remove the source's head segment and append it to our tail.
      Segment segmentToMove = source.head;
      long movedByteCount = segmentToMove.limit - segmentToMove.pos;
      source.head = segmentToMove.pop();
      if (head == null) {
        head = segmentToMove;
        head.next = head.prev = head;
      } else {
        Segment tail = head.prev;
        tail = tail.push(segmentToMove);
        tail.compact();
      }
      source.size -= movedByteCount;
      size += movedByteCount;
      byteCount -= movedByteCount;
    }
}

以上是Buffer中的实现源码,其在实现write函数时尽可能减少cpu及memory的消耗,建议查看源码中write函数的注释,这样可以更好的理解该方法设计的理念。在以上代码中,包含了以Segment为底层存储结构的Buffer在字节操作上的基本方式,其他所有字节或字符操作接口的实现均与此类似。
主要接口包括:

public interface BufferedSource extends Source {
  Buffer buffer();
  boolean exhausted() throws IOException;
  void require(long byteCount) throws IOException;
  boolean request(long byteCount) throws IOException;
  byte readByte() throws IOException;
  short readShort() throws IOException;
  short readShortLe() throws IOException;
  int readInt() throws IOException;
  int readIntLe() throws IOException;
  long readLong() throws IOException;
  long readLongLe() throws IOException;
  long readDecimalLong() throws IOException;
  long readHexadecimalUnsignedLong() throws IOException;
  void skip(long byteCount) throws IOException;
  ByteString readByteString() throws IOException;
  ByteString readByteString(long byteCount) throws IOException;
  byte[] readByteArray() throws IOException;
  byte[] readByteArray(long byteCount) throws IOException;
  int read(byte[] sink) throws IOException;
  void readFully(byte[] sink) throws IOException;
  int read(byte[] sink, int offset, int byteCount) throws IOException;
  void readFully(Buffer sink, long byteCount) throws IOException;
  long readAll(Sink sink) throws IOException;
  String readUtf8() throws IOException;
  String readUtf8(long byteCount) throws IOException;
  String readUtf8Line() throws IOException;
  String readUtf8LineStrict() throws IOException;
  int readUtf8CodePoint() throws IOException;
  String readString(Charset charset) throws IOException;
  String readString(long byteCount, Charset charset) throws IOException;
  long indexOf(byte b) throws IOException;
  long indexOf(byte b, long fromIndex) throws IOException;
  long indexOf(ByteString bytes) throws IOException;
  long indexOf(ByteString bytes, long fromIndex) throws IOException;
  long indexOfElement(ByteString targetBytes) throws IOException;
  long indexOfElement(ByteString targetBytes, long fromIndex) throws IOException;
  InputStream inputStream();
}
public interface BufferedSink extends Sink {
  Buffer buffer();
  BufferedSink write(ByteString byteString) throws IOException;
  BufferedSink write(byte[] source) throws IOException;
  BufferedSink write(byte[] source, int offset, int byteCount) throws IOException;
  long writeAll(Source source) throws IOException;
  BufferedSink write(Source source, long byteCount) throws IOException;
  BufferedSink writeUtf8(String string) throws IOException;
  BufferedSink writeUtf8(String string, int beginIndex, int endIndex) throws IOException;
  BufferedSink writeUtf8CodePoint(int codePoint) throws IOException;
  BufferedSink writeString(String string, Charset charset) throws IOException;
  BufferedSink writeString(String string, int beginIndex, int endIndex, Charset charset)
      throws IOException;
  BufferedSink writeByte(int b) throws IOException;
  BufferedSink writeShort(int s) throws IOException;
  BufferedSink writeShortLe(int s) throws IOException;
  BufferedSink writeInt(int i) throws IOException;
  BufferedSink writeIntLe(int i) throws IOException;
  BufferedSink writeLong(long v) throws IOException;
  BufferedSink writeLongLe(long v) throws IOException;
  BufferedSink writeDecimalLong(long v) throws IOException;
  BufferedSink writeHexadecimalUnsignedLong(long v) throws IOException;
  BufferedSink emitCompleteSegments() throws IOException;
  BufferedSink emit() throws IOException;
  OutputStream outputStream();
}

相关接口的具体实现参考Buffer的源码

六、Okio

Okio中主要提供将其他数据源包装成Source、Sink的接口以及将Source、Sink包装成Buffer的接口,以Source为例说明:

1. 将其他数据源包装成Source
// 对于本身没有超时机制的流(如socket)采用AsyncTimeout实现超时机制
public static Source source(final Socket socket) throws IOException {
    if (socket == null) throw new IllegalArgumentException("socket == null");
    AsyncTimeout timeout = timeout(socket);
    Source source = source(socket.getInputStream(), timeout);
    return timeout.source(source);
}

// 对于本身具备超时机制的流(如文件流)根据其本身超时属性来调用超时回调
public static Sink sink(final OutputStream out) {
    return sink(out, new Timeout());
}

private static Sink sink(final OutputStream out, final Timeout timeout) {
    if (out == null) throw new IllegalArgumentException("out == null");
    if (timeout == null) throw new IllegalArgumentException("timeout == null");

    return new Sink() {
      @Override public void write(Buffer source, long byteCount) throws IOException {
        checkOffsetAndCount(source.size, 0, byteCount);
        while (byteCount > 0) {
          timeout.throwIfReached();
          Segment head = source.head;
          int toCopy = (int) Math.min(byteCount, head.limit - head.pos);
          out.write(head.data, head.pos, toCopy);

          head.pos += toCopy;
          byteCount -= toCopy;
          source.size -= toCopy;

          if (head.pos == head.limit) {
            source.head = head.pop();
            SegmentPool.recycle(head);
          }
        }
      }

      @Override public void flush() throws IOException {
        out.flush();
      }

      @Override public void close() throws IOException {
        out.close();
      }

      @Override public Timeout timeout() {
        return timeout;
      }

      @Override public String toString() {
        return "sink(" + out + ")";
      }
    };
  }
2. 将Source包装成BufferSource
public static BufferedSource buffer(Source source) {
    if (source == null) throw new IllegalArgumentException("source == null");
    return new RealBufferedSource(source);
}

其中RealBufferedSource基于装饰模式实现,将source中的数据传递给Buffer,由Buffer来提供操作的具体实现。


至此,Okio的主要实现就分析完了,由于其设计理念与java中I/O流的设计理念相近,因此也可以参照java包装流的原理实现Source及Sink的包装,从而实现诸如加解密、压缩解压缩等效果。