OkHttp是一个非常强大的网络请求库,相比HttpClient、Volley和HttpUrlConnection有很大的优势,Android6.0以后废除了HttpClient,4.4以后HttpUrlConnection底层也是用OkHttp实现的,而Volley已经停止维护了,OkHttp是目前使用最广泛的网络请求库。以往只是停留在会用的阶段,今天来看下OkHttp内部具体是如何实现的。
下面是一段标准的post请求:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| OkHttpClient httpClient = new OkHttpClient().newBuilder() .followRedirects(false) .followSslRedirects(false) .addInterceptor(new GzipRequestInterceptor()) .build(); MediaType type = MediaType.parse("application/json; charset=utf-8"); RequestBody requestBody = RequestBody.create(type, json); Request request = new Request.Builder() .url(StatApi.BASE_URL_STAT) .post(requestBody) .tag(StatApi.BASE_URL_STAT) .build(); httpClient.newCall(request).enqueue(new Callback() { @Override public void onFailure(Call call, IOException e) { System.out.println("call = [" + call + "], e = [" + e + "]"); } @Override public void onResponse(Call call, Response response) throws IOException { System.out.println("call = [" + call + "], response = [" + response + "]"); } });
|
首先我们用Builder模式创建OkHttpClient对象,利用OkHttpClient,我们可以发起request请求和获得response结果,在OkHttpClient创建过程中我们可以配置参数,
例如.cookieJar()进行Cookie管理,.sslSocketFactory()进行证书认证管理,.connectTimeout()进行超时管理,.addInterceptor()进行拦截器管理(实现GZIP压缩等),.cache()进行缓存等.
创建Request对象,然后httpClient.newCall(request)会创建RealCall对象,RealCall有如下几个成员变量:
1 2 3 4 5 6 7 8 9
| private final OkHttpClient client; // Guarded by this. private boolean executed; volatile boolean canceled; /** The application's original request unadulterated by redirects or auth headers. */ Request originalRequest;//请求实体类 HttpEngine engine;//核心请求类
|
RealCall中有enqueue和execute两种方式请求,区别在于enqueue是异步请求,execute是同步请求,会阻塞线程.
这两个方法之中均出现了Dispatcher类,这里说下Dispatcher类,这个类维护着一个线程池,最多允许同时存在64个请求数,超出的部分会被缓存起来.
注意这三个双向队列,很重要,用来记录我们所有发起的同步或者异步的请求.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| /** Ready async calls in the order they'll be run. */ private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();//记录还未运行的异步请求 /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */ private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();//记录正在运行的异步请求 /** Running synchronous calls. Includes canceled calls that haven't finished yet. */ private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();//记录同步请求 当使用enqueue时,会创建AsyncCall(AsyncCall是一个Runnable对象)对象然后加入线程池.而execute时.则是将当前的RealCall记录下来并立刻执行. void enqueue(Callback responseCallback, boolean forWebSocket) { synchronized (this) { if (executed) throw new IllegalStateException("Already Executed"); executed = true; } client.dispatcher().enqueue(new AsyncCall(responseCallback, forWebSocket)); } @Override public Response execute() throws IOException { synchronized (this) { if (executed) throw new IllegalStateException("Already Executed"); executed = true; } try { client.dispatcher().executed(this); Response result = getResponseWithInterceptorChain(false); if (result == null) throw new IOException("Canceled"); return result; } finally { client.dispatcher().finished(this); } }
|
继续跟下去,发现不管是同步还是异步的执行,最终都会调用getResponseWithInterceptorChain()方法,从而获得Response对象,同步调用会直接返回Response,异步的话由于有回调方法,所以会调用Callback的onResponse或onFailure方法.
看下getResponseWithInterceptorChain()方法:
1 2 3 4
| private Response getResponseWithInterceptorChain(boolean forWebSocket) throws IOException { Interceptor.Chain chain = new ApplicationInterceptorChain(0, originalRequest, forWebSocket); return chain.proceed(originalRequest); }
|
这里发现又多了个ApplicationInterceptorChain类.这个类用来对Request进行统一处理,可以看下它的proceed方法.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Override public Response proceed(Request request) throws IOException { // If there's another interceptor in the chain, call that. if (index < client.interceptors().size()) { Interceptor.Chain chain = new ApplicationInterceptorChain(index + 1, request, forWebSocket); Interceptor interceptor = client.interceptors().get(index); Response interceptedResponse = interceptor.intercept(chain); if (interceptedResponse == null) { throw new NullPointerException("application interceptor " + interceptor + " returned null"); } return interceptedResponse; } // No more interceptors. Do HTTP. return getResponse(request, forWebSocket); }
|
这里采用责任链模式将我们之前创建OkHttpClient时添加的Intercepter进行遍历处理,例如.addInterceptor(new GzipRequestInterceptor())
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| public class GzipRequestInterceptor implements Interceptor { @Override public Response intercept(Chain chain) throws IOException { Request originalRequest = chain.request(); if (originalRequest.body() == null || originalRequest.header("Content-Encoding") != null) { return chain.proceed(originalRequest); } Request compressedRequest = originalRequest.newBuilder() .header("Content-Encoding", "gzip") .method(originalRequest.method(), gzip(originalRequest.body())) .build(); return chain.proceed(compressedRequest); } private RequestBody gzip(final RequestBody body) { return new RequestBody() { @Override public MediaType contentType() { return body.contentType(); } @Override public long contentLength() { return -1; // 无法知道压缩后的数据大小 } @Override public void writeTo(BufferedSink sink) throws IOException { BufferedSink gzipSink = Okio.buffer(new GzipSink(sink)); body.writeTo(gzipSink); gzipSink.close(); } }; } }
|
实现GZIP压缩等.
Intercepter遍历执行之后会调用getResponse方法.
在这个方法中我们会发现最终创建了一个HttpEngine对象,这个类是核心类,发送请求和获得返回数据都是通过这个类来进行.
可以看到大致流程为:
1 2 3
| engine.sendRequest(); engine.readResponse(); Response response = engine.getResponse();
|
engine.sendRequest()方法中首先调用了networkRequest(userRequest),在这个方法中会进行Request header的填充,包括设置GZIP压缩,Cookie填充等等.
接下来就是缓存处理了,Internal.instance在OkHttpClient初始化的时候被初始化,主要用来方便缓存和连接池的管理.
1 2 3 4 5
| InternalCache responseCache = Internal.instance.internalCache(client); Response cacheCandidate = responseCache != null ? responseCache.get(request) : null; long now = System.currentTimeMillis();
|
responseCache为Cache类的代理,对外提供了增删改查的接口,真正的缓存实现是在Cache类中的,可以发现,Cache是用DiskLruCache进行缓存的.DisLruCache的实现就不再多说了,很容易看懂.
这里先取出缓存,接下来进行缓存有效性的判断:
1
| cacheStrategy = new CacheStrategy.Factory(now, request, cacheCandidate).get();
|
这里进行了一大堆的判断,目的是判断缓存是否可用,详细判断查看Http协议,都是些协议规定的东西,没什么特别的.
networkRequest和cacheResponse两个字段很重要,通过这两个字段来判断是否需要请求网络:
- 都为空,证明Request设置了Cache-Control: only-if-cached,并且缓存中找不到对应的缓存记录。
networkRequest不为空,cacheResponse为空 没有可用的缓存,需要访问网络
networkRequest为空,cacheResponse不为空 直接访问缓存即可,不需要访问网络
networkRequest和cacheResponse都不为空 缓存过时,而且设置了If-Modified-Since或者If-None-Match字段,这个时候需要访问网络,如果服务器返回304,则表明服务器没有改动,返回缓存即可,节省流量(这块的逻辑在engine.readResponse()中).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| networkRequest = cacheStrategy.networkRequest; cacheResponse = cacheStrategy.cacheResponse; if (responseCache != null) { responseCache.trackResponse(cacheStrategy);//更新请求次数,通过网络请求的次数,通过缓存请求的次数 } if (cacheCandidate != null && cacheResponse == null) { closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it. } // If we're forbidden from using the network and the cache is insufficient, fail. if (networkRequest == null && cacheResponse == null) {//当Request设置了Cache-Control: only-if-cached,并且缓存中找不到对应的缓存记录时... userResponse = new Response.Builder() .request(userRequest) .priorResponse(stripBody(priorResponse)) .protocol(Protocol.HTTP_1_1) .code(504) .message("Unsatisfiable Request (only-if-cached)") .body(EMPTY_BODY) .build(); return; } // If we don't need the network, we're done. if (networkRequest == null) {//networkRequest为空,cacheResponse不为空,则直接返回缓存数据 userResponse = cacheResponse.newBuilder() .request(userRequest) .priorResponse(stripBody(priorResponse)) .cacheResponse(stripBody(cacheResponse)) .build(); userResponse = unzip(userResponse); return; }
|
缓存的分析到此结束,接下来就到了真正的网络请求的时候了.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| // We need the network to satisfy this request. Possibly for validating a conditional GET. boolean success = false; try { httpStream = connect(); httpStream.setHttpEngine(this); if (writeRequestHeadersEagerly()) { long contentLength = OkHeaders.contentLength(request); if (bufferRequestBody) { if (contentLength > Integer.MAX_VALUE) { throw new IllegalStateException("Use setFixedLengthStreamingMode() or " + "setChunkedStreamingMode() for requests larger than 2 GiB."); } if (contentLength != -1) { // Buffer a request body of a known length. httpStream.writeRequestHeaders(networkRequest); requestBodyOut = new RetryableSink((int) contentLength); } else { // Buffer a request body of an unknown length. Don't write request headers until the // entire body is ready; otherwise we can't set the Content-Length header correctly. requestBodyOut = new RetryableSink(); } } else { httpStream.writeRequestHeaders(networkRequest); requestBodyOut = httpStream.createRequestBody(networkRequest, contentLength); } } success = true; } finally { // If we're crashing on I/O or otherwise, don't leak the cache body. if (!success && cacheCandidate != null) { closeQuietly(cacheCandidate.body()); } }
|
先看下connect()方法,这个方法一直跟进去会走到StreamAllocation.findConnection()中去,核心代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| //... RealConnection allocatedConnection = this.connection; if (allocatedConnection != null && !allocatedConnection.noNewStreams) {//如果本身已经存在了一个连接,并且连接可用 return allocatedConnection; } // Attempt to get a connection from the pool. RealConnection pooledConnection = Internal.instance.get(connectionPool, address, this);//如果连接池中有可复用的连接 if (pooledConnection != null) { this.connection = pooledConnection; return pooledConnection; } //... RealConnection newConnection = new RealConnection(selectedRoute);//新建一个连接 acquire(newConnection); synchronized (connectionPool) { Internal.instance.put(connectionPool, newConnection); this.connection = newConnection; if (canceled) throw new IOException("Canceled"); } newConnection.connect(connectTimeout, readTimeout, writeTimeout, address.connectionSpecs(), connectionRetryEnabled); routeDatabase().connected(newConnection.route()); return newConnection;
|
继续进入newConnection.connect()方法,发现在这里通过address创建了普通的Socket或是SSLSocket对象.继续跟进去会发现,最终会调用newConnection.connectSocket()方法,
这个方法中真正打通了Socket连接,而且如果采用的协议为Http2.x或者Spdy3的话还会创建一个FramedConnection对象,这个对象很重要,基于Http2.x和Spdy3请求的发送和响应的获取都是通过这个对象来进行.
Http1xStream和Http2xStream分别对应Http1.x和Http2.x.
至此,连接已经建立成功.
回到HttpEngine.sendRequest()方法中.
看看connect之后又做了什么:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| if (writeRequestHeadersEagerly()) { long contentLength = OkHeaders.contentLength(request); if (bufferRequestBody) { if (contentLength > Integer.MAX_VALUE) { throw new IllegalStateException("Use setFixedLengthStreamingMode() or " + "setChunkedStreamingMode() for requests larger than 2 GiB."); } if (contentLength != -1) { // Buffer a request body of a known length. httpStream.writeRequestHeaders(networkRequest); requestBodyOut = new RetryableSink((int) contentLength); } else { // Buffer a request body of an unknown length. Don't write request headers until the // entire body is ready; otherwise we can't set the Content-Length header correctly. requestBodyOut = new RetryableSink(); } } else { httpStream.writeRequestHeaders(networkRequest); requestBodyOut = httpStream.createRequestBody(networkRequest, contentLength); } }
|
可以看到,这里如果bufferRequestBody为false或者bufferRequestBody为true并且Content-Length字段不是-1的话,就先socket通道中写入Headers.
回到RealCall.getResponse方法中,接下来执行了engine.readResponse()这里做了一系列判断,如果没有写入Headers就写入Headers,如果没有写入body就写入body.
接下来会执行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| ... networkResponse = readNetworkResponse();//读取Response,Http1.x和Http2.x分别处理 receiveHeaders(networkResponse.headers());//这里主要是Cookie保存 接下来: // If we have a cache response too, then we're doing a conditional get. if (cacheResponse != null) { if (validate(cacheResponse, networkResponse)) {//服务器返回304或者Last-Modified没有发生改变,直接返回缓存内容 userResponse = cacheResponse.newBuilder() .request(userRequest) .priorResponse(stripBody(priorResponse)) .headers(combine(cacheResponse.headers(), networkResponse.headers())) .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build(); networkResponse.body().close();//释放连接 releaseStreamAllocation();//释放连接 // Update the cache after combining headers but before stripping the // Content-Encoding header (as performed by initContentStream()). InternalCache responseCache = Internal.instance.internalCache(client); responseCache.trackConditionalCacheHit(); responseCache.update(cacheResponse, stripBody(userResponse));//更新缓存 userResponse = unzip(userResponse); return; } else { closeQuietly(cacheResponse.body()); } } userResponse = networkResponse.newBuilder()//正常请求返回 .request(userRequest) .priorResponse(stripBody(priorResponse)) .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build(); if (hasBody(userResponse)) { maybeCache(); userResponse = unzip(cacheWritingResponse(storeRequest, userResponse));//更新缓存并构造Response返回 }
|
再次回到RealCall.getResponse方法中,继续往下看,在engine.readResponse()之后,会做一些异常处理,即重传机制.最后若请求正常返回,则返回Response.
至此,一次完整的请求结束,看的真是累,不得不说,看源码实现还是有很多好处的,可以加深我们对网络请求过程的认识和对Http协议的理解。