您好,登錄后才能下訂單哦!
這篇文章將為大家詳細(xì)講解有關(guān)Android框架之OkHttp3源碼的示例分析,小編覺得挺實(shí)用的,因此分享給大家做個(gè)參考,希望大家閱讀完這篇文章后可以有所收獲。
gradle依賴
implementation 'com.squareup.okhttp3:okhttp:3.11.0'
implementation 'com.squareup.okio:okio:1.15.0'
/** *這里拿get請(qǐng)求來 * 異步的get請(qǐng)求 */ public void okhttpAsyn() { //設(shè)置超時(shí)的時(shí)間 OkHttpClient.Builder builder = new OkHttpClient.Builder() .connectTimeout(15, TimeUnit.SECONDS) .writeTimeout(20, TimeUnit.SECONDS) .readTimeout(20, TimeUnit.SECONDS) ; OkHttpClient okHttpClient = builder.build(); Request request = new Request.Builder() .get() //設(shè)置請(qǐng)求模式 .url("https://www.baidu.com/") .build(); Call call = okHttpClient.newCall(request); call.enqueue(new Callback() { @Override public void onFailure(Call call, IOException e) { Log.d("MainActivity", "-----------onFailure-----------"); } @Override public void onResponse(Call call, Response response) throws IOException { Log.d("MainActivity", "----onResponse----" + response.body().toString()); runOnUiThread(new Runnable() { @Override public void run() { Toast.makeText(MainActivity.this, "請(qǐng)求成功", Toast.LENGTH_LONG).show(); } }); } }); }
從OkHttp的基本使用中,我們看到,通過okHttpClient.newCall()方法,拿到這個(gè)call對(duì)象,我們看看newCall是怎么走的
/** * Prepares the {@code request} to be executed at some point in the future. */ @Override public Call newCall(Request request) { return RealCall.newRealCall(this, request, false /* for web socket */); } static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) { // Safely publish the Call instance to the EventListener. RealCall call = new RealCall(client, originalRequest, forWebSocket); call.eventListener = client.eventListenerFactory().create(call); return call; }
從這里的源碼知道,okHttpClient.newCall()實(shí)際上返回的是RealCall對(duì)象,而call.enqueue(),實(shí)際上是調(diào)用的了RealCall中的enqueue()方法,我們看看enqueue()方法方法怎么走。
@Override public void enqueue(Callback responseCallback) { synchronized (this) { if (executed) throw new IllegalStateException("Already Executed"); executed = true; } captureCallStackTrace(); eventListener.callStart(this); client.dispatcher().enqueue(new AsyncCall(responseCallback)); }
可以看到client.dispatcher().enqueue(new AsyncCall(responseCallback));這句代碼,也就是說,最終是有的請(qǐng)求是有dispatcher來完成,我們看看dispatcher。
/* * Copyright (C) 2013 Square, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package okhttp3; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; import java.util.Deque; import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import okhttp3.RealCall.AsyncCall; import okhttp3.internal.Util; /** * Policy on when async requests are executed. * * <p>Each dispatcher uses an {@link ExecutorService} to run calls internally. If you supply your * own executor, it should be able to run {@linkplain #getMaxRequests the configured maximum} number * of calls concurrently. */ public final class Dispatcher { //最大請(qǐng)求的并發(fā)數(shù) private int maxRequests = 64; //每個(gè)主機(jī)最大請(qǐng)求數(shù) private int maxRequestsPerHost = 5; private @Nullable Runnable idleCallback; /** 消費(fèi)線程池 */ private @Nullable ExecutorService executorService; /** 準(zhǔn)備運(yùn)行的異步請(qǐng)求隊(duì)列 */ private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>(); /** 正在運(yùn)行的異步請(qǐng)求隊(duì)列 */ private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>(); /** 正在運(yùn)行的同步請(qǐng)求隊(duì)列 */ private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>(); /** 構(gòu)造方法 */ public Dispatcher(ExecutorService executorService) { this.executorService = executorService; } public Dispatcher() { } public synchronized ExecutorService executorService() { if (executorService == null) { executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false)); } return executorService; } /** * *設(shè)置并發(fā)執(zhí)行最大的請(qǐng)求數(shù)量 * <p>If more than {@code maxRequests} requests are in flight when this is invoked, those requests * will remain in flight. */ public synchronized void setMaxRequests(int maxRequests) { if (maxRequests < 1) { throw new IllegalArgumentException("max < 1: " + maxRequests); } this.maxRequests = maxRequests; promoteCalls(); } //獲取到最大請(qǐng)求的數(shù)量 public synchronized int getMaxRequests() { return maxRequests; } /** * 設(shè)置每個(gè)主機(jī)并發(fā)執(zhí)行的請(qǐng)求的最大數(shù)量 * <p>If more than {@code maxRequestsPerHost} requests are in flight when this is invoked, those * requests will remain in flight. * * <p>WebSocket connections to hosts <b>do not</b> count against this limit. */ public synchronized void setMaxRequestsPerHost(int maxRequestsPerHost) { if (maxRequestsPerHost < 1) { throw new IllegalArgumentException("max < 1: " + maxRequestsPerHost); } this.maxRequestsPerHost = maxRequestsPerHost; promoteCalls(); } //獲取每個(gè)主機(jī)最大并發(fā)數(shù)量 public synchronized int getMaxRequestsPerHost() { return maxRequestsPerHost; } /** * Set a callback to be invoked each time the dispatcher becomes idle (when the number of running * calls returns to zero). * * <p>Note: The time at which a {@linkplain Call call} is considered idle is different depending * on whether it was run {@linkplain Call#enqueue(Callback) asynchronously} or * {@linkplain Call#execute() synchronously}. Asynchronous calls become idle after the * {@link Callback#onResponse onResponse} or {@link Callback#onFailure onFailure} callback has * returned. Synchronous calls become idle once {@link Call#execute() execute()} returns. This * means that if you are doing synchronous calls the network layer will not truly be idle until * every returned {@link Response} has been closed. */ public synchronized void setIdleCallback(@Nullable Runnable idleCallback) { this.idleCallback = idleCallback; } synchronized void enqueue(AsyncCall call) { if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) { runningAsyncCalls.add(call); executorService().execute(call); } else { readyAsyncCalls.add(call); } } /** * Cancel all calls currently enqueued or executing. Includes calls executed both {@linkplain * Call#execute() synchronously} and {@linkplain Call#enqueue asynchronously}. */ public synchronized void cancelAll() { for (AsyncCall call : readyAsyncCalls) { call.get().cancel(); } for (AsyncCall call : runningAsyncCalls) { call.get().cancel(); } for (RealCall call : runningSyncCalls) { call.cancel(); } } private void promoteCalls() { if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity. if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote. for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) { AsyncCall call = i.next(); if (runningCallsForHost(call) < maxRequestsPerHost) { i.remove(); runningAsyncCalls.add(call); executorService().execute(call); } if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity. } } //----------------省略若干代碼----------------------- }
我們來找到這段代碼
synchronized void enqueue(AsyncCall call) { if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) { runningAsyncCalls.add(call); executorService().execute(call); } else { readyAsyncCalls.add(call); } }
當(dāng)正在運(yùn)行的異步請(qǐng)求隊(duì)列中的數(shù)量小于64并且正在運(yùn)行的請(qǐng)求主機(jī)數(shù)小于5時(shí)則把請(qǐng)求加載到runningAsyncCalls中并在線程池中執(zhí)行,否則就再入到readyAsyncCalls中進(jìn)行緩存等待。而runningAsyncCalls這個(gè)請(qǐng)求隊(duì)列存放的就是AsyncCall對(duì)象,而這個(gè)AsyncCall就是RealCall的內(nèi)部類,也就是說executorService().execute(call);實(shí)際上走的是RealCall類中的execute()方法.
@Override protected void execute() { boolean signalledCallback = false; try { Response response = getResponseWithInterceptorChain(); if (retryAndFollowUpInterceptor.isCanceled()) { signalledCallback = true; responseCallback.onFailure(RealCall.this, new IOException("Canceled")); } else { signalledCallback = true; responseCallback.onResponse(RealCall.this, response); } } catch (IOException e) { if (signalledCallback) { // Do not signal the callback twice! Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e); } else { eventListener.callFailed(RealCall.this, e); responseCallback.onFailure(RealCall.this, e); } } finally { client.dispatcher().finished(this); } }
這部分的代碼,相信很多人都能夠看的明白,無非就是一些成功,失敗的回調(diào),這段代碼,最重要的是esponse response = getResponseWithInterceptorChain();和client.dispatcher().finished(this);我們先來看看client.dispatcher().finished(this);這句代碼是怎么執(zhí)行的。
/** Used by {@code AsyncCall#run} to signal completion. */ void finished(AsyncCall call) { finished(runningAsyncCalls, call, true); } /** Used by {@code Call#execute} to signal completion. */ void finished(RealCall call) { finished(runningSyncCalls, call, false); } private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) { int runningCallsCount; Runnable idleCallback; synchronized (this) { if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!"); if (promoteCalls) promoteCalls(); runningCallsCount = runningCallsCount(); idleCallback = this.idleCallback; } if (runningCallsCount == 0 && idleCallback != null) { idleCallback.run(); } } private void promoteCalls() { if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity. if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote. for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) { AsyncCall call = i.next(); if (runningCallsForHost(call) < maxRequestsPerHost) { i.remove(); runningAsyncCalls.add(call); executorService().execute(call); } if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity. } }
由于client.dispatcher().finished(this);這句代碼是放到finally中執(zhí)行的,所以無論什么情況,都會(huì)執(zhí)行上面的promoteCalls()方法,而從promoteCalls()方法中可以看出通過遍歷來獲取到下一個(gè)請(qǐng)求從而執(zhí)行下一個(gè)網(wǎng)絡(luò)請(qǐng)求。
回過頭來,我們看看這一句代碼Response response = getResponseWithInterceptorChain(); 通過getResponseWithInterceptorChain();來獲取到response,然后回調(diào)返回。很明顯getResponseWithInterceptorChain()這句代碼里面進(jìn)行了網(wǎng)絡(luò)請(qǐng)求。我們看看是怎么執(zhí)行的。
Response getResponseWithInterceptorChain() throws IOException { // Build a full stack of interceptors. List<Interceptor> interceptors = new ArrayList<>(); interceptors.addAll(client.interceptors()); interceptors.add(retryAndFollowUpInterceptor); interceptors.add(new BridgeInterceptor(client.cookieJar())); interceptors.add(new CacheInterceptor(client.internalCache())); interceptors.add(new ConnectInterceptor(client)); if (!forWebSocket) { interceptors.addAll(client.networkInterceptors()); } interceptors.add(new CallServerInterceptor(forWebSocket)); Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0, originalRequest, this, eventListener, client.connectTimeoutMillis(), client.readTimeoutMillis(), client.writeTimeoutMillis()); return chain.proceed(originalRequest); } }
從上面代碼可以知道,緩存,網(wǎng)絡(luò)請(qǐng)求,都封裝成攔截器的形式。攔截器主要用來觀察,修改以及可能短路的請(qǐng)求輸出和響應(yīng)的回來。最后return chain.proceed,而chain是通過new RealInterceptorChain來獲取到的,我們來看看RealInterceptorChain對(duì)象,然后找到proceed()方法。
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec, RealConnection connection) throws IOException { if (index >= interceptors.size()) throw new AssertionError(); calls++; // If we already have a stream, confirm that the incoming request will use it. if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) { throw new IllegalStateException("network interceptor " + interceptors.get(index - 1) + " must retain the same host and port"); } // If we already have a stream, confirm that this is the only call to chain.proceed(). if (this.httpCodec != null && calls > 1) { throw new IllegalStateException("network interceptor " + interceptors.get(index - 1) + " must call proceed() exactly once"); } // 調(diào)用下一個(gè)攔截器 RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec, connection, index + 1, request, call, eventListener, connectTimeout, readTimeout, writeTimeout); Interceptor interceptor = interceptors.get(index); Response response = interceptor.intercept(next); //調(diào)用攔截器中的intercept()方法 // Confirm that the next interceptor made its required call to chain.proceed(). if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) { throw new IllegalStateException("network interceptor " + interceptor + " must call proceed() exactly once"); } // Confirm that the intercepted response isn't null. if (response == null) { throw new NullPointerException("interceptor " + interceptor + " returned null"); } if (response.body() == null) { throw new IllegalStateException( "interceptor " + interceptor + " returned a response with no body"); } return response; }
從上面的代碼可以看出來,chain.proceed主要是講集合中的攔截器遍歷出來,然后通過調(diào)用每一個(gè)攔截器中的intercept()方法,然后獲取到response結(jié)果,返回。
我們看看CacheInterceptor這個(gè)類,找到intercept()方法。
@Override public Response intercept(Chain chain) throws IOException { Response cacheCandidate = cache != null ? cache.get(chain.request()) : null; long now = System.currentTimeMillis(); //創(chuàng)建CacheStrategy.Factory對(duì)象,進(jìn)行緩存配置 CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get(); //網(wǎng)絡(luò)請(qǐng)求 Request networkRequest = strategy.networkRequest; //緩存響應(yīng) Response cacheResponse = strategy.cacheResponse; if (cache != null) { //記錄當(dāng)前請(qǐng)求是網(wǎng)絡(luò)發(fā)起還是緩存發(fā)起 cache.trackResponse(strategy); } if (cacheCandidate != null && cacheResponse == null) { closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it. } // 不進(jìn)行網(wǎng)絡(luò)請(qǐng)求并且緩存不存在或者過期則返回504錯(cuò)誤 if (networkRequest == null && cacheResponse == null) { return new Response.Builder() .request(chain.request()) .protocol(Protocol.HTTP_1_1) .code(504) .message("Unsatisfiable Request (only-if-cached)") .body(Util.EMPTY_RESPONSE) .sentRequestAtMillis(-1L) .receivedResponseAtMillis(System.currentTimeMillis()) .build(); } // 不進(jìn)行網(wǎng)絡(luò)請(qǐng)求,而且緩存可以使用,直接返回緩存 if (networkRequest == null) { return cacheResponse.newBuilder() .cacheResponse(stripBody(cacheResponse)) .build(); } //進(jìn)行網(wǎng)絡(luò)請(qǐng)求 Response networkResponse = null; try { networkResponse = chain.proceed(networkRequest); } finally { // If we're crashing on I/O or otherwise, don't leak the cache body. if (networkResponse == null && cacheCandidate != null) { closeQuietly(cacheCandidate.body()); } } //---------省略若干代碼------------- return response; }
上面我做了很多注釋,基本的流程是有緩存就取緩存里面的,沒有緩存就請(qǐng)求網(wǎng)絡(luò)。我們來看看網(wǎng)絡(luò)請(qǐng)求的類CallServerInterceptor
/* * Copyright (C) 2016 Square, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package okhttp3.internal.http; import java.io.IOException; import java.net.ProtocolException; import okhttp3.Interceptor; import okhttp3.Request; import okhttp3.Response; import okhttp3.internal.Util; import okhttp3.internal.connection.RealConnection; import okhttp3.internal.connection.StreamAllocation; import okio.Buffer; import okio.BufferedSink; import okio.ForwardingSink; import okio.Okio; import okio.Sink; /** This is the last interceptor in the chain. It makes a network call to the server. */ public final class CallServerInterceptor implements Interceptor { private final boolean forWebSocket; public CallServerInterceptor(boolean forWebSocket) { this.forWebSocket = forWebSocket; } @Override public Response intercept(Chain chain) throws IOException { RealInterceptorChain realChain = (RealInterceptorChain) chain; HttpCodec httpCodec = realChain.httpStream(); StreamAllocation streamAllocation = realChain.streamAllocation(); RealConnection connection = (RealConnection) realChain.connection(); Request request = realChain.request(); long sentRequestMillis = System.currentTimeMillis(); realChain.eventListener().requestHeadersStart(realChain.call()); httpCodec.writeRequestHeaders(request); realChain.eventListener().requestHeadersEnd(realChain.call(), request); Response.Builder responseBuilder = null; if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) { // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100 // Continue" response before transmitting the request body. If we don't get that, return // what we did get (such as a 4xx response) without ever transmitting the request body. if ("100-continue".equalsIgnoreCase(request.header("Expect"))) { httpCodec.flushRequest(); realChain.eventListener().responseHeadersStart(realChain.call()); responseBuilder = httpCodec.readResponseHeaders(true); } if (responseBuilder == null) { // Write the request body if the "Expect: 100-continue" expectation was met. realChain.eventListener().requestBodyStart(realChain.call()); long contentLength = request.body().contentLength(); CountingSink requestBodyOut = new CountingSink(httpCodec.createRequestBody(request, contentLength)); BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut); request.body().writeTo(bufferedRequestBody); bufferedRequestBody.close(); realChain.eventListener() .requestBodyEnd(realChain.call(), requestBodyOut.successfulCount); } else if (!connection.isMultiplexed()) { // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection // from being reused. Otherwise we're still obligated to transmit the request body to // leave the connection in a consistent state. streamAllocation.noNewStreams(); } } httpCodec.finishRequest(); if (responseBuilder == null) { realChain.eventListener().responseHeadersStart(realChain.call()); responseBuilder = httpCodec.readResponseHeaders(false); } Response response = responseBuilder .request(request) .handshake(streamAllocation.connection().handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build(); int code = response.code(); if (code == 100) { // server sent a 100-continue even though we did not request one. // try again to read the actual response responseBuilder = httpCodec.readResponseHeaders(false); response = responseBuilder .request(request) .handshake(streamAllocation.connection().handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build(); code = response.code(); } realChain.eventListener() .responseHeadersEnd(realChain.call(), response); if (forWebSocket && code == 101) { // Connection is upgrading, but we need to ensure interceptors see a non-null response body. response = response.newBuilder() .body(Util.EMPTY_RESPONSE) .build(); } else { response = response.newBuilder() .body(httpCodec.openResponseBody(response)) .build(); } if ("close".equalsIgnoreCase(response.request().header("Connection")) || "close".equalsIgnoreCase(response.header("Connection"))) { streamAllocation.noNewStreams(); } if ((code == 204 || code == 205) && response.body().contentLength() > 0) { throw new ProtocolException( "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength()); } return response; } }
關(guān)于“Android框架之OkHttp3源碼的示例分析”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,使各位可以學(xué)到更多知識(shí),如果覺得文章不錯(cuò),請(qǐng)把它分享出去讓更多的人看到。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。