Okhttp源码解析

OkHttp是一个非常强大的网络请求库,相比HttpClient、Volley和HttpUrlConnection有很大的优势,Android6.0以后废除了HttpClient,4.4以后HttpUrlConnection底层也是用OkHttp实现的,而Volley已经停止维护了,OkHttp是目前使用最广泛的网络请求库。以往只是停留在会用的阶段,今天来看下OkHttp内部具体是如何实现的。

下面是一段标准的post请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
OkHttpClient httpClient = new OkHttpClient().newBuilder()
.followRedirects(false)
.followSslRedirects(false)
.addInterceptor(new GzipRequestInterceptor())
.build();
MediaType type = MediaType.parse("application/json; charset=utf-8");
RequestBody requestBody = RequestBody.create(type, json);
Request request = new Request.Builder()
.url(StatApi.BASE_URL_STAT)
.post(requestBody)
.tag(StatApi.BASE_URL_STAT)
.build();
httpClient.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
System.out.println("call = [" + call + "], e = [" + e + "]");
}
@Override
public void onResponse(Call call, Response response) throws IOException {
System.out.println("call = [" + call + "], response = [" + response + "]");
}
});

首先我们用Builder模式创建OkHttpClient对象,利用OkHttpClient,我们可以发起request请求和获得response结果,在OkHttpClient创建过程中我们可以配置参数,
例如.cookieJar()进行Cookie管理,.sslSocketFactory()进行证书认证管理,.connectTimeout()进行超时管理,.addInterceptor()进行拦截器管理(实现GZIP压缩等),.cache()进行缓存等.

创建Request对象,然后httpClient.newCall(request)会创建RealCall对象,RealCall有如下几个成员变量:

1
2
3
4
5
6
7
8
9
private final OkHttpClient client;
// Guarded by this.
private boolean executed;
volatile boolean canceled;
/** The application's original request unadulterated by redirects or auth headers. */
Request originalRequest;//请求实体类
HttpEngine engine;//核心请求类

RealCall中有enqueue和execute两种方式请求,区别在于enqueue是异步请求,execute是同步请求,会阻塞线程.

这两个方法之中均出现了Dispatcher类,这里说下Dispatcher类,这个类维护着一个线程池,最多允许同时存在64个请求数,超出的部分会被缓存起来.
注意这三个双向队列,很重要,用来记录我们所有发起的同步或者异步的请求.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/** Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();//记录还未运行的异步请求
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();//记录正在运行的异步请求
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();//记录同步请求
当使用enqueue时,会创建AsyncCall(AsyncCall是一个Runnable对象)对象然后加入线程池.而execute时.则是将当前的RealCall记录下来并立刻执行.
void enqueue(Callback responseCallback, boolean forWebSocket) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
client.dispatcher().enqueue(new AsyncCall(responseCallback, forWebSocket));
}
@Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
try {
client.dispatcher().executed(this);
Response result = getResponseWithInterceptorChain(false);
if (result == null) throw new IOException("Canceled");
return result;
} finally {
client.dispatcher().finished(this);
}
}

继续跟下去,发现不管是同步还是异步的执行,最终都会调用getResponseWithInterceptorChain()方法,从而获得Response对象,同步调用会直接返回Response,异步的话由于有回调方法,所以会调用Callback的onResponse或onFailure方法.
看下getResponseWithInterceptorChain()方法:

1
2
3
4
private Response getResponseWithInterceptorChain(boolean forWebSocket) throws IOException {
Interceptor.Chain chain = new ApplicationInterceptorChain(0, originalRequest, forWebSocket);
return chain.proceed(originalRequest);
}

这里发现又多了个ApplicationInterceptorChain类.这个类用来对Request进行统一处理,可以看下它的proceed方法.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override public Response proceed(Request request) throws IOException {
// If there's another interceptor in the chain, call that.
if (index < client.interceptors().size()) {
Interceptor.Chain chain = new ApplicationInterceptorChain(index + 1, request, forWebSocket);
Interceptor interceptor = client.interceptors().get(index);
Response interceptedResponse = interceptor.intercept(chain);
if (interceptedResponse == null) {
throw new NullPointerException("application interceptor " + interceptor
+ " returned null");
}
return interceptedResponse;
}
// No more interceptors. Do HTTP.
return getResponse(request, forWebSocket);
}

这里采用责任链模式将我们之前创建OkHttpClient时添加的Intercepter进行遍历处理,例如.addInterceptor(new GzipRequestInterceptor())

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class GzipRequestInterceptor implements Interceptor {
@Override public Response intercept(Chain chain) throws IOException {
Request originalRequest = chain.request();
if (originalRequest.body() == null || originalRequest.header("Content-Encoding") != null) {
return chain.proceed(originalRequest);
}
Request compressedRequest = originalRequest.newBuilder()
.header("Content-Encoding", "gzip")
.method(originalRequest.method(), gzip(originalRequest.body()))
.build();
return chain.proceed(compressedRequest);
}
private RequestBody gzip(final RequestBody body) {
return new RequestBody() {
@Override public MediaType contentType() {
return body.contentType();
}
@Override public long contentLength() {
return -1; // 无法知道压缩后的数据大小
}
@Override public void writeTo(BufferedSink sink) throws IOException {
BufferedSink gzipSink = Okio.buffer(new GzipSink(sink));
body.writeTo(gzipSink);
gzipSink.close();
}
};
}
}

实现GZIP压缩等.

Intercepter遍历执行之后会调用getResponse方法.
在这个方法中我们会发现最终创建了一个HttpEngine对象,这个类是核心类,发送请求和获得返回数据都是通过这个类来进行.
可以看到大致流程为:

1
2
3
engine.sendRequest();
engine.readResponse();
Response response = engine.getResponse();

engine.sendRequest()方法中首先调用了networkRequest(userRequest),在这个方法中会进行Request header的填充,包括设置GZIP压缩,Cookie填充等等.

接下来就是缓存处理了,Internal.instance在OkHttpClient初始化的时候被初始化,主要用来方便缓存和连接池的管理.

1
2
3
4
5
InternalCache responseCache = Internal.instance.internalCache(client);
Response cacheCandidate = responseCache != null
? responseCache.get(request)
: null;
long now = System.currentTimeMillis();

responseCache为Cache类的代理,对外提供了增删改查的接口,真正的缓存实现是在Cache类中的,可以发现,Cache是用DiskLruCache进行缓存的.DisLruCache的实现就不再多说了,很容易看懂.

这里先取出缓存,接下来进行缓存有效性的判断:

1
cacheStrategy = new CacheStrategy.Factory(now, request, cacheCandidate).get();

这里进行了一大堆的判断,目的是判断缓存是否可用,详细判断查看Http协议,都是些协议规定的东西,没什么特别的.

networkRequest和cacheResponse两个字段很重要,通过这两个字段来判断是否需要请求网络:

  1. 都为空,证明Request设置了Cache-Control: only-if-cached,并且缓存中找不到对应的缓存记录。
  1. networkRequest不为空,cacheResponse为空 没有可用的缓存,需要访问网络

  2. networkRequest为空,cacheResponse不为空 直接访问缓存即可,不需要访问网络

  3. networkRequest和cacheResponse都不为空 缓存过时,而且设置了If-Modified-Since或者If-None-Match字段,这个时候需要访问网络,如果服务器返回304,则表明服务器没有改动,返回缓存即可,节省流量(这块的逻辑在engine.readResponse()中).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
networkRequest = cacheStrategy.networkRequest;
cacheResponse = cacheStrategy.cacheResponse;
if (responseCache != null) {
responseCache.trackResponse(cacheStrategy);//更新请求次数,通过网络请求的次数,通过缓存请求的次数
}
if (cacheCandidate != null && cacheResponse == null) {
closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
}
// If we're forbidden from using the network and the cache is insufficient, fail.
if (networkRequest == null && cacheResponse == null) {//当Request设置了Cache-Control: only-if-cached,并且缓存中找不到对应的缓存记录时...
userResponse = new Response.Builder()
.request(userRequest)
.priorResponse(stripBody(priorResponse))
.protocol(Protocol.HTTP_1_1)
.code(504)
.message("Unsatisfiable Request (only-if-cached)")
.body(EMPTY_BODY)
.build();
return;
}
// If we don't need the network, we're done.
if (networkRequest == null) {//networkRequest为空,cacheResponse不为空,则直接返回缓存数据
userResponse = cacheResponse.newBuilder()
.request(userRequest)
.priorResponse(stripBody(priorResponse))
.cacheResponse(stripBody(cacheResponse))
.build();
userResponse = unzip(userResponse);
return;
}

缓存的分析到此结束,接下来就到了真正的网络请求的时候了.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean success = false;
try {
httpStream = connect();
httpStream.setHttpEngine(this);
if (writeRequestHeadersEagerly()) {
long contentLength = OkHeaders.contentLength(request);
if (bufferRequestBody) {
if (contentLength > Integer.MAX_VALUE) {
throw new IllegalStateException("Use setFixedLengthStreamingMode() or "
+ "setChunkedStreamingMode() for requests larger than 2 GiB.");
}
if (contentLength != -1) {
// Buffer a request body of a known length.
httpStream.writeRequestHeaders(networkRequest);
requestBodyOut = new RetryableSink((int) contentLength);
} else {
// Buffer a request body of an unknown length. Don't write request headers until the
// entire body is ready; otherwise we can't set the Content-Length header correctly.
requestBodyOut = new RetryableSink();
}
} else {
httpStream.writeRequestHeaders(networkRequest);
requestBodyOut = httpStream.createRequestBody(networkRequest, contentLength);
}
}
success = true;
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (!success && cacheCandidate != null) {
closeQuietly(cacheCandidate.body());
}
}

先看下connect()方法,这个方法一直跟进去会走到StreamAllocation.findConnection()中去,核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//...
RealConnection allocatedConnection = this.connection;
if (allocatedConnection != null && !allocatedConnection.noNewStreams) {//如果本身已经存在了一个连接,并且连接可用
return allocatedConnection;
}
// Attempt to get a connection from the pool.
RealConnection pooledConnection = Internal.instance.get(connectionPool, address, this);//如果连接池中有可复用的连接
if (pooledConnection != null) {
this.connection = pooledConnection;
return pooledConnection;
}
//...
RealConnection newConnection = new RealConnection(selectedRoute);//新建一个连接
acquire(newConnection);
synchronized (connectionPool) {
Internal.instance.put(connectionPool, newConnection);
this.connection = newConnection;
if (canceled) throw new IOException("Canceled");
}
newConnection.connect(connectTimeout, readTimeout, writeTimeout, address.connectionSpecs(),
connectionRetryEnabled);
routeDatabase().connected(newConnection.route());
return newConnection;

继续进入newConnection.connect()方法,发现在这里通过address创建了普通的Socket或是SSLSocket对象.继续跟进去会发现,最终会调用newConnection.connectSocket()方法,
这个方法中真正打通了Socket连接,而且如果采用的协议为Http2.x或者Spdy3的话还会创建一个FramedConnection对象,这个对象很重要,基于Http2.x和Spdy3请求的发送和响应的获取都是通过这个对象来进行.

Http1xStream和Http2xStream分别对应Http1.x和Http2.x.

至此,连接已经建立成功.

回到HttpEngine.sendRequest()方法中.
看看connect之后又做了什么:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
if (writeRequestHeadersEagerly()) {
long contentLength = OkHeaders.contentLength(request);
if (bufferRequestBody) {
if (contentLength > Integer.MAX_VALUE) {
throw new IllegalStateException("Use setFixedLengthStreamingMode() or "
+ "setChunkedStreamingMode() for requests larger than 2 GiB.");
}
if (contentLength != -1) {
// Buffer a request body of a known length.
httpStream.writeRequestHeaders(networkRequest);
requestBodyOut = new RetryableSink((int) contentLength);
} else {
// Buffer a request body of an unknown length. Don't write request headers until the
// entire body is ready; otherwise we can't set the Content-Length header correctly.
requestBodyOut = new RetryableSink();
}
} else {
httpStream.writeRequestHeaders(networkRequest);
requestBodyOut = httpStream.createRequestBody(networkRequest, contentLength);
}
}

可以看到,这里如果bufferRequestBody为false或者bufferRequestBody为true并且Content-Length字段不是-1的话,就先socket通道中写入Headers.
回到RealCall.getResponse方法中,接下来执行了engine.readResponse()这里做了一系列判断,如果没有写入Headers就写入Headers,如果没有写入body就写入body.
接下来会执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
...
networkResponse = readNetworkResponse();//读取Response,Http1.x和Http2.x分别处理
receiveHeaders(networkResponse.headers());//这里主要是Cookie保存
接下来:
// If we have a cache response too, then we're doing a conditional get.
if (cacheResponse != null) {
if (validate(cacheResponse, networkResponse)) {//服务器返回304或者Last-Modified没有发生改变,直接返回缓存内容
userResponse = cacheResponse.newBuilder()
.request(userRequest)
.priorResponse(stripBody(priorResponse))
.headers(combine(cacheResponse.headers(), networkResponse.headers()))
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
networkResponse.body().close();//释放连接
releaseStreamAllocation();//释放连接
// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
InternalCache responseCache = Internal.instance.internalCache(client);
responseCache.trackConditionalCacheHit();
responseCache.update(cacheResponse, stripBody(userResponse));//更新缓存
userResponse = unzip(userResponse);
return;
} else {
closeQuietly(cacheResponse.body());
}
}
userResponse = networkResponse.newBuilder()//正常请求返回
.request(userRequest)
.priorResponse(stripBody(priorResponse))
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
if (hasBody(userResponse)) {
maybeCache();
userResponse = unzip(cacheWritingResponse(storeRequest, userResponse));//更新缓存并构造Response返回
}

再次回到RealCall.getResponse方法中,继续往下看,在engine.readResponse()之后,会做一些异常处理,即重传机制.最后若请求正常返回,则返回Response.

至此,一次完整的请求结束,看的真是累,不得不说,看源码实现还是有很多好处的,可以加深我们对网络请求过程的认识和对Http协议的理解。

文章目录
|