前言
查看Okhttp源码时,在Transmitter
类中发现了一个AsyncTimeout
对象。了解代码后得知,该类是用于做一些超时检测的操作。本文主要总结笔者对于AsyncTimeout
机制的研究。
本文基于okhttp 3.14.9
github地址:github.com/square/okht…
gradle依赖:implementation group: ‘com.squareup.okhttp3’, name: ‘okhttp’, version: ‘3.14.9’
AsyncTimeout
AsyncTimeout
类位于Okio
库,集成自Timeout
。其类中有如下注释:
/**
* This timeout uses a background thread to take action exactly when the timeout occurs. Use this to
* implement timeouts where they aren't supported natively, such as to sockets that are blocked on
* writing.
*
* <p>Subclasses should override {@link #timedOut} to take action when a timeout occurs. This method
* will be invoked by the shared watchdog thread so it should not do any long-running operations.
* Otherwise we risk starving other timeouts from being triggered.
*
* <p>Use {@link #sink} and {@link #source} to apply this timeout to a stream. The returned value
* will apply the timeout to each operation on the wrapped stream.
*
* <p>Callers should call {@link #enter} before doing work that is subject to timeouts, and {@link
* #exit} afterwards. The return value of {@link #exit} indicates whether a timeout was triggered.
* Note that the call to {@link #timedOut} is asynchronous, and may be called after {@link #exit}.
*/
public class AsyncTimeout extends Timeout {
复制代码
这里提供了几个有用的信息:
- 这是一个利用统一子线程检测超时的工具,主要针对的是一些原生不支持超时检测的类。
- 它提供了一个
timedOut()
方法,作为检测到超时的回调。 - 内部提供的
sink()
和source()
方法可以适配流的读写超时检测,这可以对应到网络请求的流读写,后面会讲到。 - 提供
enter()
和exit()
作为开始计时和结束计时的调用。也就是说开始执行计时的起点将会在enter()
发生。
Timeout
上述一直在说超时检测,那究竟超时的时间从何而来呢?先来看看Timeout
中有如下定义:
/**
* True if {@code deadlineNanoTime} is defined. There is no equivalent to null
* or 0 for {@link System#nanoTime}.
*/
private boolean hasDeadline;
private long deadlineNanoTime;
private long timeoutNanos;
复制代码
Timeout
中定义了:deadlineNanoTime
也就是deadline时间;timeoutNanos
是超时时长。具体到其子类AsyncTimeout
就是利用timeoutNanos
来计算超时的。
AsyncTimeout属性定义
再来看看AsyncTimeout
的一些属性定义,
private static final int TIMEOUT_WRITE_SIZE = 64 * 1024;
private static final long IDLE_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(60);
private static final long IDLE_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(IDLE_TIMEOUT_MILLIS);
static @Nullable AsyncTimeout head;
private boolean inQueue;
private @Nullable AsyncTimeout next;
private long timeoutAt;
复制代码
- timeoutAt:记录超时的具体时间,这个的计算是通过开始计时的当前时间+上述的
timeoutNanos
。 - 上述代码出现了一个
head
和next
的定义,前面在AsyncTimeout
的注释中讲到,它会通过一个统一的子线程进行超时检测。而这个head
和next
的定义即一个链表的结构,用于将每个AsyncTimeout
对象形成一个队列,方便每次超时检测触发时的遍历。这个会在后面讲到。 - inQueue:即
AsyncTimeout
对象一旦加入到链表中,就会置为true。
AsyncTimeout在网络请求流程中的使用
先来看看AsyncTimeout
具体在网络请求流程中的运用。
-
在
Transmitter
中有一个自带的AsyncTimeout
类型属性,它的超时时间timeoutNanos
会在Transmitter
的构造方法中设置,设置的是OkHttpClient
初始化时自定义的callTimeout
。这里的超时检测的是整个请求的总时间。private final AsyncTimeout timeout = new AsyncTimeout() { @Override protected void timedOut() { cancel(); } }; 。。。 public Transmitter(OkHttpClient client, Call call) { this.client = client; this.connectionPool = Internal.instance.realConnectionPool(client.connectionPool()); this.call = call; this.eventListener = client.eventListenerFactory().create(call); this.timeout.timeout(client.callTimeoutMillis(), MILLISECONDS); } 复制代码
callTimeout:整个请求过程的超时时间,通常不设置默认为0
-
在建立连接时会调用到
RealConnection.connectSocket()
,建立连接之后会创建两个Okio相关的BufferedSource
和BufferedSink
对象。// RealConnection.connectSocket() // RealConnection.java 275行 source = Okio.buffer(Okio.source(rawSocket)); sink = Okio.buffer(Okio.sink(rawSocket)); // Okio.java 221行 public static Source source(Socket socket) throws IOException { if (socket == null) throw new IllegalArgumentException("socket == null"); if (socket.getInputStream() == null) throw new IOException("socket's input stream == null"); AsyncTimeout timeout = timeout(socket); Source source = source(socket.getInputStream(), timeout); return timeout.source(source); } // Okio.java 115行 public static Sink sink(Socket socket) throws IOException { if (socket == null) throw new IllegalArgumentException("socket == null"); if (socket.getOutputStream() == null) throw new IOException("socket's output stream == null"); AsyncTimeout timeout = timeout(socket); Sink sink = sink(socket.getOutputStream(), timeout); return timeout.sink(sink); } // RealConnection.java 542行 ExchangeCodec newCodec(OkHttpClient client, Interceptor.Chain chain) throws SocketException { if (http2Connection != null) { return new Http2ExchangeCodec(client, this, chain, http2Connection); } else { socket.setSoTimeout(chain.readTimeoutMillis()); source.timeout().timeout(chain.readTimeoutMillis(), MILLISECONDS); sink.timeout().timeout(chain.writeTimeoutMillis(), MILLISECONDS); return new Http1ExchangeCodec(client, this, source, sink); } } 复制代码
新建
BufferedSource
和BufferedSink
对象时都需要先新建一个AsyncTimeout
,在利用其新建BufferedSource
和BufferedSink
,这里的代码运用到了装饰器
的思想,继而将source
和sink
拥有timeout
的能力。后续在新建ExchangeCodec
时,会分别设置OkHttpClient
初始化时自定义的readTimeout
和writeTimeout
,对应读写的超时。readTimeout:读超时时间,默认10s。
writeTimeout:写超时时间,默认10s。
ps:因为socket自身具备连接超时的检测,故connectTimeout
不需要采用AsyncTimeout
的方案。
AsyncTimeout超时检测
加入队列,开始检测
// AsyncTimeout.java 72行
public final void enter() {
if (inQueue) throw new IllegalStateException("Unbalanced enter/exit");
long timeoutNanos = timeoutNanos();
boolean hasDeadline = hasDeadline();
if (timeoutNanos == 0 && !hasDeadline) {
return; // No timeout and no deadline? Don't bother with the queue.
}
inQueue = true;
scheduleTimeout(this, timeoutNanos, hasDeadline);
}
复制代码
AsyncTimeout.enter()
方法如上所示,调用之后正式进入超时检测。重点关注最后的scheduleTimeout(this, timeoutNanos, hasDeadline);
这时一个static方法,还记得上面提到的AsyncTimeout
有一个静态成员变量head
吗?接下来就来看看这个方法。
// AsyncTimeout.java 83行
private static synchronized void scheduleTimeout(
AsyncTimeout node, long timeoutNanos, boolean hasDeadline) {
// Start the watchdog thread and create the head node when the first timeout is scheduled.
if (head == null) {
head = new AsyncTimeout();
new Watchdog().start();
}
long now = System.nanoTime();
if (timeoutNanos != 0 && hasDeadline) {
// Compute the earliest event; either timeout or deadline. Because nanoTime can wrap around,
// Math.min() is undefined for absolute values, but meaningful for relative ones.
node.timeoutAt = now + Math.min(timeoutNanos, node.deadlineNanoTime() - now);
} else if (timeoutNanos != 0) {
node.timeoutAt = now + timeoutNanos;
} else if (hasDeadline) {
node.timeoutAt = node.deadlineNanoTime();
} else {
throw new AssertionError();
}
// Insert the node in sorted order.
long remainingNanos = node.remainingNanos(now);
for (AsyncTimeout prev = head; true; prev = prev.next) {
if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) {
node.next = prev.next;
prev.next = node;
if (prev == head) {
AsyncTimeout.class.notify(); // Wake up the watchdog when inserting at the front.
}
break;
}
}
}
复制代码
ps:node.remainingNanos(now);
会计算出当前时间与超时时间的时间间隔。
方法中主要做了3件事:
- 静态变量
head
若为空,则说明全局检测还未开启,需要开启检测线程Watchdog
。ps:head
实际上只是一个队列开端的标志,本身不属于一次超时的检测。 - 计算出加入到检测队列的当前节点的超时时间
timeoutAt
- 将全局的检测队列进行重排序,按照
timeoutAt
从小到大排序。保证后续Watchdog
的检测机制。因为是链表结构,只需要将下一个节点改变指向即可。具体的顺序可见下图(先用旅程图代替,当作时间轴理解即可):