摘要:本文主要分析Okio
的实现原理。
文章原创,允许转载,转载请注明出处。
一、概述
Okio最初是作为Okhttp的一个组件被开发出来的,用于操作I/O流,其主要有以下几个特点:
- 内嵌Timeout机制,适用于read()及write() ;
- 采用Source、Sink取代InputStream、OutputStream作为输入输出流类型,简化API设计;
- BufferedSource及BufferedSink定义了几乎所有常用流操作接口,并且无需考虑字节或字符操作的差异性(java中操作字符需将Stream包装成Reader),易于使用;
- Okio中提供了将文件、socket及其他java流等转换成Source或Sink的API,方便在传统I/O流与Okio流之间切换。
正是这些优势,让Okio库不仅很好的配合OkHttpClient,并且可以作为独立库用于简化I/O操作。
二、ByteString
ByteString是基于字节数组实现的一个String变种,在底层数组不被外界破坏的情况下其实现是不可变的,类似于String。相对于String,ByteString是在字节处理上更有优势,提供了hex、base64、md5及sha256等相关实用接口,同时通过缓存utf-8字符串有效提高其utf-8编解码的效率,更适用于网络传输。
三、Segment
Segment是Okio中的缓存数据结构,大小为2048字节,实现类似于nio中的ByteBuffer,采用pos、limit来操作数据。Okio中的Buffer的底层存储结构便是由Segment形成的循环双链表,从而实现更快的数据传输及复制;同时,通过设计SegmentPool来缓存一定量的空闲Segment,从而来提高Segment的分配及使用效率,SegmentPool以由Segment形成的单链表作为底层存储结构。
Segment具有两个属性shared及owner:
- shared : 表明该Segment的底层字节数组被多个Segment共用,这个属性限制该Segment不能移动或变更已有数据,从而确保数据的一致性,并且该Segment不可被SegmentPool回收;
- owner : 表明底层字节数组由该Segment创建,因此该Segment拥有扩展limit的权利,即该Segment可以扩展底层字节数组的数据;
Segment主要包含以下操作:
1. pop()
从双链表中移除该Segment,并返回该Segment的下一个Segment;
2. push(Segement segment)
将segment插入当前Segment之后,并返回插入的segment;
3. split(int byteCount)
在该Segment之前新建并插入一个新的Segment,并设置底层字节数组为共享模式,通过设定两个Segment的pos及limit调整各自的可用数据范围,实现Segment的分割:
public Segment split(int byteCount) {
if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();
Segment prefix = new Segment(this);
prefix.limit = prefix.pos + byteCount;
pos += byteCount;
prev.push(prefix);
return prefix;
}
4. compact()
把该Segment的数据转移至其前一个Segment,然后移除该Segment,实现Segment的压缩:
public void compact() {
if (prev == this) throw new IllegalStateException();
if (!prev.owner) return; // Cannot compact: prev isn't writable.
int byteCount = limit - pos;
int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);
if (byteCount > availableByteCount) return; // Cannot compact: not enough writable space.
writeTo(prev, byteCount);
pop();
SegmentPool.recycle(this);
}
5. writeTo(Segment sink, int byteCount)
将该Segment中byteCount个字节传输至sink中:
public void writeTo(Segment sink, int byteCount) {
if (!sink.owner) throw new IllegalArgumentException();
if (sink.limit + byteCount > SIZE) {
// We can't fit byteCount bytes at the sink's current position. Shift sink first.
if (sink.shared) throw new IllegalArgumentException();
if (sink.limit + byteCount - sink.pos > SIZE) throw new IllegalArgumentException();
System.arraycopy(sink.data, sink.pos, sink.data, 0, sink.limit - sink.pos);
sink.limit -= sink.pos;
sink.pos = 0;
}
System.arraycopy(data, pos, sink.data, sink.limit, byteCount);
sink.limit += byteCount;
pos += byteCount;
}
四、Timeout
1. 基本Timeout
Timeout通过以下属性来设定某操作的限时:
// 是否有截止时间
private boolean hasDeadline;
// 截止时间,是绝对时间,当到达该时间点时操作超时
private long deadlineNanoTime;
// 超时时间,是相对时间,当到达(当前时间+超时时间)时操作超时
private long timeoutNanos;
而Timeout是通过调用以下函数抛出超时异常:
public void throwIfReached() throws IOException {
if (Thread.interrupted()) {
throw new InterruptedIOException("thread interrupted");
}
if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) {
throw new InterruptedIOException("deadline reached");
}
}
2. 异步AsyncTimeout
异步超时器的实现原理如下:
- 将所有AsyncTimeout按超时时间点升序链接成超时器单链表;
- 启动一个WatchDog线程,采用wait/notify的模式在相应时间点移除单链表中的AsyncTimeout并调用其timeout();
由于超时链表中的所有超时器共用一个看门狗定时器,因此在使用异步超时器时需特别注意:不可在timeout()中做耗时操作,否则可能会导致链表中其他超时器不准确。
AsyncTimeout通过enter()将自己插入超时器单链表中,然后在操作完成之后调用exit()来判定是否超时:
public final void enter() {
// 省略无关代码
scheduleTimeout(this, timeoutNanos, hasDeadline);
}
private static synchronized void scheduleTimeout(
AsyncTimeout node, long timeoutNanos, boolean hasDeadline) {
// 启动看门狗定时器
if (head == null) {
head = new AsyncTimeout();
new Watchdog().start();
}
// 将AsyncTimeout插入超时链表合适位置
long remainingNanos = node.remainingNanos(now);
for (AsyncTimeout prev = head; true; prev = prev.next) {
if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) {
node.next = prev.next;
prev.next = node;
if (prev == head) {
// 当前插入AsyncTimeout的超时时间点最近,因此之前看门狗中定时时间可能失效,需唤醒重新计算
AsyncTimeout.class.notify();
}
break;
}
}
}
public final boolean exit() {
return cancelScheduledTimeout(this);
}
private static synchronized boolean cancelScheduledTimeout(AsyncTimeout node) {
// 移除相应超时器
for (AsyncTimeout prev = head; prev != null; prev = prev.next) {
if (prev.next == node) {
prev.next = node.next;
node.next = null;
return false;
}
}
// 若该超时器不存在于链表中,说明已经超时
return true;
}
五、Buffer
前文中已经提及:Buffer的底层数据存储结构是由Segment组成的循环双链表,数据的复制和传输通过操作Segment的归属及limit、pos来实现,因此非常高效。另一方面,Buffer实现了BufferedSource及BufferedSink中的相关接口,因此实现了绝大部分常用字节及字符操作接口。
首先,先感受下Source及Sink中的基本接口的实现:
@Override
public long read(Buffer sink, long byteCount) {
if (sink == null) throw new IllegalArgumentException("sink == null");
if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
if (size == 0) return -1L;
if (byteCount > size) byteCount = size;
sink.write(this, byteCount);
return byteCount;
}
@Override
public void write(Buffer source, long byteCount) {
if (source == null) throw new IllegalArgumentException("source == null");
if (source == this) throw new IllegalArgumentException("source == this");
checkOffsetAndCount(source.size, 0, byteCount);
while (byteCount > 0) {
// Is a prefix of the source's head segment all that we need to move?
if (byteCount < (source.head.limit - source.head.pos)) {
Segment tail = head != null ? head.prev : null;
if (tail != null && tail.owner
&& (byteCount + tail.limit - (tail.shared ? 0 : tail.pos) <= Segment.SIZE)) {
// Our existing segments are sufficient. Move bytes from source's head to our tail.
source.head.writeTo(tail, (int) byteCount);
source.size -= byteCount;
size += byteCount;
return;
} else {
// We're going to need another segment. Split the source's head
// segment in two, then move the first of those two to this buffer.
source.head = source.head.split((int) byteCount);
}
}
// Remove the source's head segment and append it to our tail.
Segment segmentToMove = source.head;
long movedByteCount = segmentToMove.limit - segmentToMove.pos;
source.head = segmentToMove.pop();
if (head == null) {
head = segmentToMove;
head.next = head.prev = head;
} else {
Segment tail = head.prev;
tail = tail.push(segmentToMove);
tail.compact();
}
source.size -= movedByteCount;
size += movedByteCount;
byteCount -= movedByteCount;
}
}
以上是Buffer中的实现源码,其在实现write函数时尽可能减少cpu及memory的消耗,建议查看源码中write函数的注释,这样可以更好的理解该方法设计的理念。在以上代码中,包含了以Segment为底层存储结构的Buffer在字节操作上的基本方式,其他所有字节或字符操作接口的实现均与此类似。
主要接口包括:
public interface BufferedSource extends Source {
Buffer buffer();
boolean exhausted() throws IOException;
void require(long byteCount) throws IOException;
boolean request(long byteCount) throws IOException;
byte readByte() throws IOException;
short readShort() throws IOException;
short readShortLe() throws IOException;
int readInt() throws IOException;
int readIntLe() throws IOException;
long readLong() throws IOException;
long readLongLe() throws IOException;
long readDecimalLong() throws IOException;
long readHexadecimalUnsignedLong() throws IOException;
void skip(long byteCount) throws IOException;
ByteString readByteString() throws IOException;
ByteString readByteString(long byteCount) throws IOException;
byte[] readByteArray() throws IOException;
byte[] readByteArray(long byteCount) throws IOException;
int read(byte[] sink) throws IOException;
void readFully(byte[] sink) throws IOException;
int read(byte[] sink, int offset, int byteCount) throws IOException;
void readFully(Buffer sink, long byteCount) throws IOException;
long readAll(Sink sink) throws IOException;
String readUtf8() throws IOException;
String readUtf8(long byteCount) throws IOException;
String readUtf8Line() throws IOException;
String readUtf8LineStrict() throws IOException;
int readUtf8CodePoint() throws IOException;
String readString(Charset charset) throws IOException;
String readString(long byteCount, Charset charset) throws IOException;
long indexOf(byte b) throws IOException;
long indexOf(byte b, long fromIndex) throws IOException;
long indexOf(ByteString bytes) throws IOException;
long indexOf(ByteString bytes, long fromIndex) throws IOException;
long indexOfElement(ByteString targetBytes) throws IOException;
long indexOfElement(ByteString targetBytes, long fromIndex) throws IOException;
InputStream inputStream();
}
public interface BufferedSink extends Sink {
Buffer buffer();
BufferedSink write(ByteString byteString) throws IOException;
BufferedSink write(byte[] source) throws IOException;
BufferedSink write(byte[] source, int offset, int byteCount) throws IOException;
long writeAll(Source source) throws IOException;
BufferedSink write(Source source, long byteCount) throws IOException;
BufferedSink writeUtf8(String string) throws IOException;
BufferedSink writeUtf8(String string, int beginIndex, int endIndex) throws IOException;
BufferedSink writeUtf8CodePoint(int codePoint) throws IOException;
BufferedSink writeString(String string, Charset charset) throws IOException;
BufferedSink writeString(String string, int beginIndex, int endIndex, Charset charset)
throws IOException;
BufferedSink writeByte(int b) throws IOException;
BufferedSink writeShort(int s) throws IOException;
BufferedSink writeShortLe(int s) throws IOException;
BufferedSink writeInt(int i) throws IOException;
BufferedSink writeIntLe(int i) throws IOException;
BufferedSink writeLong(long v) throws IOException;
BufferedSink writeLongLe(long v) throws IOException;
BufferedSink writeDecimalLong(long v) throws IOException;
BufferedSink writeHexadecimalUnsignedLong(long v) throws IOException;
BufferedSink emitCompleteSegments() throws IOException;
BufferedSink emit() throws IOException;
OutputStream outputStream();
}
相关接口的具体实现参考Buffer的源码。
六、Okio
Okio中主要提供将其他数据源包装成Source、Sink的接口以及将Source、Sink包装成Buffer的接口,以Source为例说明:
1. 将其他数据源包装成Source
// 对于本身没有超时机制的流(如socket)采用AsyncTimeout实现超时机制
public static Source source(final Socket socket) throws IOException {
if (socket == null) throw new IllegalArgumentException("socket == null");
AsyncTimeout timeout = timeout(socket);
Source source = source(socket.getInputStream(), timeout);
return timeout.source(source);
}
// 对于本身具备超时机制的流(如文件流)根据其本身超时属性来调用超时回调
public static Sink sink(final OutputStream out) {
return sink(out, new Timeout());
}
private static Sink sink(final OutputStream out, final Timeout timeout) {
if (out == null) throw new IllegalArgumentException("out == null");
if (timeout == null) throw new IllegalArgumentException("timeout == null");
return new Sink() {
@Override public void write(Buffer source, long byteCount) throws IOException {
checkOffsetAndCount(source.size, 0, byteCount);
while (byteCount > 0) {
timeout.throwIfReached();
Segment head = source.head;
int toCopy = (int) Math.min(byteCount, head.limit - head.pos);
out.write(head.data, head.pos, toCopy);
head.pos += toCopy;
byteCount -= toCopy;
source.size -= toCopy;
if (head.pos == head.limit) {
source.head = head.pop();
SegmentPool.recycle(head);
}
}
}
@Override public void flush() throws IOException {
out.flush();
}
@Override public void close() throws IOException {
out.close();
}
@Override public Timeout timeout() {
return timeout;
}
@Override public String toString() {
return "sink(" + out + ")";
}
};
}
2. 将Source包装成BufferSource
public static BufferedSource buffer(Source source) {
if (source == null) throw new IllegalArgumentException("source == null");
return new RealBufferedSource(source);
}
其中RealBufferedSource基于装饰模式实现,将source中的数据传递给Buffer,由Buffer来提供操作的具体实现。
至此,Okio的主要实现就分析完了,由于其设计理念与java中I/O流的设计理念相近,因此也可以参照java包装流的原理实现Source及Sink的包装,从而实现诸如加解密、压缩解压缩等效果。