1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| OkHttpClient httpClient = new OkHttpClient().newBuilder() .followRedirects(false) .followSslRedirects(false) .addInterceptor(new GzipRequestInterceptor()) .build(); MediaType type = MediaType.parse("application/json; charset=utf-8"); RequestBody requestBody = RequestBody.create(type, json); Request request = new Request.Builder() .url(StatApi.BASE_URL_STAT) .post(requestBody) .tag(StatApi.BASE_URL_STAT) .build(); httpClient.newCall(request).enqueue(new Callback() { @Override public void onFailure(Call call, IOException e) { System.out.println("call = [" + call + "], e = [" + e + "]"); } @Override public void onResponse(Call call, Response response) throws IOException { System.out.println("call = [" + call + "], response = [" + response + "]"); } });
1 2 3 4 5 6 7 8 9
| private final OkHttpClient client; // Guarded by this. private boolean executed; volatile boolean canceled; /** The application's original request unadulterated by redirects or auth headers. */ Request originalRequest;//请求实体类 HttpEngine engine;//核心请求类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| /** Ready async calls in the order they'll be run. */ private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();//记录还未运行的异步请求 /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */ private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();//记录正在运行的异步请求 /** Running synchronous calls. Includes canceled calls that haven't finished yet. */ private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();//记录同步请求 当使用enqueue时,会创建AsyncCall(AsyncCall是一个Runnable对象)对象然后加入线程池.而execute时.则是将当前的RealCall记录下来并立刻执行. void enqueue(Callback responseCallback, boolean forWebSocket) { synchronized (this) { if (executed) throw new IllegalStateException("Already Executed"); executed = true; } client.dispatcher().enqueue(new AsyncCall(responseCallback, forWebSocket)); } @Override public Response execute() throws IOException { synchronized (this) { if (executed) throw new IllegalStateException("Already Executed"); executed = true; } try { client.dispatcher().executed(this); Response result = getResponseWithInterceptorChain(false); if (result == null) throw new IOException("Canceled"); return result; } finally { client.dispatcher().finished(this); } }
1 2 3 4
| private Response getResponseWithInterceptorChain(boolean forWebSocket) throws IOException { Interceptor.Chain chain = new ApplicationInterceptorChain(0, originalRequest, forWebSocket); return chain.proceed(originalRequest); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Override public Response proceed(Request request) throws IOException { // If there's another interceptor in the chain, call that. if (index < client.interceptors().size()) { Interceptor.Chain chain = new ApplicationInterceptorChain(index + 1, request, forWebSocket); Interceptor interceptor = client.interceptors().get(index); Response interceptedResponse = interceptor.intercept(chain); if (interceptedResponse == null) { throw new NullPointerException("application interceptor " + interceptor + " returned null"); } return interceptedResponse; } // No more interceptors. Do HTTP. return getResponse(request, forWebSocket); }
这里采用责任链模式将我们之前创建OkHttpClient时添加的Intercepter进行遍历处理,例如.addInterceptor(new GzipRequestInterceptor())
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| public class GzipRequestInterceptor implements Interceptor { @Override public Response intercept(Chain chain) throws IOException { Request originalRequest = chain.request(); if (originalRequest.body() == null || originalRequest.header("Content-Encoding") != null) { return chain.proceed(originalRequest); } Request compressedRequest = originalRequest.newBuilder() .header("Content-Encoding", "gzip") .method(originalRequest.method(), gzip(originalRequest.body())) .build(); return chain.proceed(compressedRequest); } private RequestBody gzip(final RequestBody body) { return new RequestBody() { @Override public MediaType contentType() { return body.contentType(); } @Override public long contentLength() { return -1; // 无法知道压缩后的数据大小 } @Override public void writeTo(BufferedSink sink) throws IOException { BufferedSink gzipSink = Okio.buffer(new GzipSink(sink)); body.writeTo(gzipSink); gzipSink.close(); } }; } }
1 2 3
| engine.sendRequest(); engine.readResponse(); Response response = engine.getResponse();
engine.sendRequest()方法中首先调用了networkRequest(userRequest),在这个方法中会进行Request header的填充,包括设置GZIP压缩,Cookie填充等等.
1 2 3 4 5
| InternalCache responseCache = Internal.instance.internalCache(client); Response cacheCandidate = responseCache != null ? responseCache.get(request) : null; long now = System.currentTimeMillis();
| cacheStrategy = new CacheStrategy.Factory(now, request, cacheCandidate).get();
- 都为空,证明Request设置了Cache-Control: only-if-cached,并且缓存中找不到对应的缓存记录。
networkRequest不为空,cacheResponse为空 没有可用的缓存,需要访问网络
networkRequest为空,cacheResponse不为空 直接访问缓存即可,不需要访问网络
networkRequest和cacheResponse都不为空 缓存过时,而且设置了If-Modified-Since或者If-None-Match字段,这个时候需要访问网络,如果服务器返回304,则表明服务器没有改动,返回缓存即可,节省流量(这块的逻辑在engine.readResponse()中).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| networkRequest = cacheStrategy.networkRequest; cacheResponse = cacheStrategy.cacheResponse; if (responseCache != null) { responseCache.trackResponse(cacheStrategy);//更新请求次数,通过网络请求的次数,通过缓存请求的次数 } if (cacheCandidate != null && cacheResponse == null) { closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it. } // If we're forbidden from using the network and the cache is insufficient, fail. if (networkRequest == null && cacheResponse == null) {//当Request设置了Cache-Control: only-if-cached,并且缓存中找不到对应的缓存记录时... userResponse = new Response.Builder() .request(userRequest) .priorResponse(stripBody(priorResponse)) .protocol(Protocol.HTTP_1_1) .code(504) .message("Unsatisfiable Request (only-if-cached)") .body(EMPTY_BODY) .build(); return; } // If we don't need the network, we're done. if (networkRequest == null) {//networkRequest为空,cacheResponse不为空,则直接返回缓存数据 userResponse = cacheResponse.newBuilder() .request(userRequest) .priorResponse(stripBody(priorResponse)) .cacheResponse(stripBody(cacheResponse)) .build(); userResponse = unzip(userResponse); return; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| // We need the network to satisfy this request. Possibly for validating a conditional GET. boolean success = false; try { httpStream = connect(); httpStream.setHttpEngine(this); if (writeRequestHeadersEagerly()) { long contentLength = OkHeaders.contentLength(request); if (bufferRequestBody) { if (contentLength > Integer.MAX_VALUE) { throw new IllegalStateException("Use setFixedLengthStreamingMode() or " + "setChunkedStreamingMode() for requests larger than 2 GiB."); } if (contentLength != -1) { // Buffer a request body of a known length. httpStream.writeRequestHeaders(networkRequest); requestBodyOut = new RetryableSink((int) contentLength); } else { // Buffer a request body of an unknown length. Don't write request headers until the // entire body is ready; otherwise we can't set the Content-Length header correctly. requestBodyOut = new RetryableSink(); } } else { httpStream.writeRequestHeaders(networkRequest); requestBodyOut = httpStream.createRequestBody(networkRequest, contentLength); } } success = true; } finally { // If we're crashing on I/O or otherwise, don't leak the cache body. if (!success && cacheCandidate != null) { closeQuietly(cacheCandidate.body()); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| //... RealConnection allocatedConnection = this.connection; if (allocatedConnection != null && !allocatedConnection.noNewStreams) {//如果本身已经存在了一个连接,并且连接可用 return allocatedConnection; } // Attempt to get a connection from the pool. RealConnection pooledConnection = Internal.instance.get(connectionPool, address, this);//如果连接池中有可复用的连接 if (pooledConnection != null) { this.connection = pooledConnection; return pooledConnection; } //... RealConnection newConnection = new RealConnection(selectedRoute);//新建一个连接 acquire(newConnection); synchronized (connectionPool) { Internal.instance.put(connectionPool, newConnection); this.connection = newConnection; if (canceled) throw new IOException("Canceled"); } newConnection.connect(connectTimeout, readTimeout, writeTimeout, address.connectionSpecs(), connectionRetryEnabled); routeDatabase().connected(newConnection.route()); return newConnection;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| if (writeRequestHeadersEagerly()) { long contentLength = OkHeaders.contentLength(request); if (bufferRequestBody) { if (contentLength > Integer.MAX_VALUE) { throw new IllegalStateException("Use setFixedLengthStreamingMode() or " + "setChunkedStreamingMode() for requests larger than 2 GiB."); } if (contentLength != -1) { // Buffer a request body of a known length. httpStream.writeRequestHeaders(networkRequest); requestBodyOut = new RetryableSink((int) contentLength); } else { // Buffer a request body of an unknown length. Don't write request headers until the // entire body is ready; otherwise we can't set the Content-Length header correctly. requestBodyOut = new RetryableSink(); } } else { httpStream.writeRequestHeaders(networkRequest); requestBodyOut = httpStream.createRequestBody(networkRequest, contentLength); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| ... networkResponse = readNetworkResponse();//读取Response,Http1.x和Http2.x分别处理 receiveHeaders(networkResponse.headers());//这里主要是Cookie保存 接下来: // If we have a cache response too, then we're doing a conditional get. if (cacheResponse != null) { if (validate(cacheResponse, networkResponse)) {//服务器返回304或者Last-Modified没有发生改变,直接返回缓存内容 userResponse = cacheResponse.newBuilder() .request(userRequest) .priorResponse(stripBody(priorResponse)) .headers(combine(cacheResponse.headers(), networkResponse.headers())) .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build(); networkResponse.body().close();//释放连接 releaseStreamAllocation();//释放连接 // Update the cache after combining headers but before stripping the // Content-Encoding header (as performed by initContentStream()). InternalCache responseCache = Internal.instance.internalCache(client); responseCache.trackConditionalCacheHit(); responseCache.update(cacheResponse, stripBody(userResponse));//更新缓存 userResponse = unzip(userResponse); return; } else { closeQuietly(cacheResponse.body()); } } userResponse = networkResponse.newBuilder()//正常请求返回 .request(userRequest) .priorResponse(stripBody(priorResponse)) .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build(); if (hasBody(userResponse)) { maybeCache(); userResponse = unzip(cacheWritingResponse(storeRequest, userResponse));//更新缓存并构造Response返回 }