共计 19451 个字符,预计需要花费 49 分钟才能阅读完成。
1. OkHttp简介
okhttp是一个第三方类库,用于android中请求网络。
这是一个开源项目,是安卓端最火热的轻量级框架,由移动支付Square公司贡献。用于替代因移除了HttpClient而导致没用的Volley。
目前更多人选择了Retrofit。
2. 源码解析
本文对OkHttp的探讨全部基于目前的最新版OkHttp:4.0.1,而这个版本作者已经使用kotlin对源码进行了重写,所以有些小伙伴可能阅读稍微有点问题,但是别担心,本文中所涉及的源码阅读起来基本上和Java一样,所以请不会kotlin的小伙伴还是耐心看下去,不太懂的语法就百度下,同时我也会对某些语法作注释
2.1 OkHttp请求流程
2.1.1 从请求处理开始分析
我们无论在使用OkHttp进行什么请求的时候都会创建OkHttpClient对象并调用他的newCall()方法,那我们就从这个方法看起:
1 2 3
override fun newCall (request: Request ) : Call {return RealCall.newRealCall(this , request, forWebSocket = false )}
可以看到返回了一个RealCall对象,所以也就意味着我们使用OkHttpClient对象调用的execute()操作实际上是RealCall的execute()操作,那我们就来看RealCall的execute()方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
override fun execute () : Response { synchronized(this ) { check(!executed) { "Already Executed" } executed = true } transmitter.timeoutEnter() transmitter.callStart() try { client.dispatcher.executed(this ) return getResponseWithInterceptorChain() } finally { client.dispatcher.finished(this ) } }
这块又调用了client.dispatcher,然后找回去找到OkHttpClient的dispatcher对象,发现他就是Dispatcher类的一个对象,接着我们继续看Dispatcher类。
2.1.2 Dispatcher任务调度
进入Dispatcher类,我们可以看到如下成员变量定义:
注:kotlin中一个成员变量的@get和@set分别对应了Java中get和set方法,所以这块我没有完完全全复制粘贴到这,我只取了定义部分
1 2 3 4 5 6 7 8 9 10 11 12
var maxRequests = 64 var maxRequestsPerHost = 5 val executorService: ExecutorServiceprivate val readyAsyncCalls = ArrayDeque<AsyncCall>()private val runningAsyncCalls = ArrayDeque<AsyncCall>()private val runningSyncCalls = ArrayDeque<RealCall>()
接下来我们看看Dispatcher的构造方法
1 2 3 4 5 6
class Dispatcher constructor () {} constructor (executorService: ExecutorService) : this () { this .executorServiceOrNull = executorService }
我们可以看到他将传进来的executorService传给了executorServiceOrNull,那我们来看看executorServiceOrNull的定义:
1 2 3 4 5 6 7 8 9 10 11 12
private var executorServiceOrNull: ExecutorService? = null @get:Synchronized @get:JvmName ("executorService" ) val executorService: ExecutorService get () { if (executorServiceOrNull == null ) { executorServiceOrNull = ThreadPoolExecutor(0 , Int .MAX_VALUE, 60 , TimeUnit.SECONDS, SynchronousQueue(), threadFactory("OkHttp Dispatcher" , false )) } return executorServiceOrNull!! }
我们可以看到executorService的set方法,就是创建了一个线程池。再结合他有两个构造器就知道:如果没有给Dispatcher传入一个线程池他就会自己创建一个线程池。这个线程池适合执行大量且耗时较少的任务。
构造器我们看完了,我们就来看他的enqueue()方法:
1 2 3 4 5 6 7 8 9 10 11 12 13
internal fun enqueue (call: AsyncCall ) { synchronized(this ) { readyAsyncCalls.add(call) if (!call.get ().forWebSocket) { val existingCall = findExistingCallWithHost(call.host()) if (existingCall != null )call.reuseCallsPerHostFrom(existingCall) } } promoteAndExecute() }
来一个请求就把他添加到就绪请求队列中去,然后就来判断forWebSocket这个属性。看到这个属性我还迷了下,有点搞不懂她是干嘛的,然后经过我的一番搜索后,发现原来OkHttp还可以进行WebSocket通信,而这个属性就是为WebSocket通信准备的。于是我就到RealCall里面找在哪儿定义了这个属性了,然后我就发现了在RealCall的newRealCall()方法这块,这个方法传入的参数中有一个Boolean值名字就叫forWebSocket。
1 2 3 4 5 6 7 8 9 10 11 12
companion object { fun newRealCall ( client: OkHttpClient , originalRequest: Request , forWebSocket: Boolean ) : RealCall { return RealCall(client, originalRequest, forWebSocket).apply { transmitter = Transmitter(client, this ) } } }
不知道大家有没有印象,咱们在上面说过,执行OkHttpClient.newCall()方法实际上是返回了一个RealCall对象,于是在那找到了这个的答案,forWebSocket=false 所以说这个if咱们不用管,直接看promoteAndExecute()方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
private fun promoteAndExecute () : Boolean { assert(!Thread.holdsLock(this )) val executableCalls = mutableListOf<AsyncCall>() val isRunning: Boolean synchronized(this ) { val i = readyAsyncCalls.iterator() while (i.hasNext()) { val asyncCall = i.next() if (runningAsyncCalls.size >= this .maxRequests) break if (asyncCall.callsPerHost().get () >= this .maxRequestsPerHost) continue i.remove() asyncCall.callsPerHost().incrementAndGet() executableCalls.add(asyncCall) runningAsyncCalls.add(asyncCall) } isRunning = runningCallsCount() > 0 } for (i in 0 until executableCalls.size) { val asyncCall = executableCalls[i] asyncCall.executeOn(executorService) } return isRunning }
首先将已就绪队列遍历一遍,判断正在运行的数量是不是大于定义的最大请求数,如果大于的话直接退出循环;如果不大于则在判断这个请求的主机请求数是不是大于定义的每个主机最大请求数,如果大于就跳过这个请求换下一个请求;不大于就把它调入正在运行的请求队列里面,直到遍历完成。然后判断还有没有正在运行的请求,如果有就isRunning置true。接着再取出executableCalls里的每一个元素,然后执行executteOn()方法。我们继续来看AsyncCall的executeOn()方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
fun executeOn (executorService: ExecutorService ) { assert(!Thread.holdsLock(client.dispatcher)) var success = false try { executorService.execute(this ) success = true } catch (e: RejectedExecutionException) { val ioException = InterruptedIOException("executor rejected" ) ioException.initCause(e) transmitter.noMoreExchanges(ioException) responseCallback.onFailure(this @RealCall , ioException) } finally { if (!success) { client.dispatcher.finished(this ) } } }
这段代码就是在执行线程池中的线程,如果成功执行就将success置true,如果不成功,则抛异常并返回给responseCallback的onFailure()方法。并且如果没有成功执行也就是success为false,那么在finally中就会执行client.dispatcher.finished()方法:
1 2 3 4
internal fun finished (call: AsyncCall ) { call.callsPerHost().decrementAndGet() finished(runningAsyncCalls, call) }
这个方法先将传入的AsyncCall的callsPerHost给减1,然后再调用了finished()方法,我们再来看这个finished()方法:
1 2 3 4 5 6 7 8 9 10 11 12 13
private fun <T> finished (calls: Deque <T >, call: T ) { val idleCallback: Runnable? synchronized(this ) { if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!" ) idleCallback = this .idleCallback } val isRunning = promoteAndExecute() if (!isRunning && idleCallback != null ) { idleCallback.run() } }
他讲此次请求从runningAsyncCalls中移除,然后执行了promoteAndExecute()方法,咱们在上面说过这个方法,他的返回值就是判断当前这个运行队列中还有没有请求,如果还有就返回true,没有就false。接着一个if,判断isRunning和idleCallback的,那么如果当前这个请求还没有执行的话,就调用run()方法执行当前请求。这样每个请求都执行完毕了。 那我们再来看看他的run()方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
override fun run () { threadName("OkHttp ${redactedUrl()} " ) { var signalledCallback = false transmitter.timeoutEnter() try { val response = getResponseWithInterceptorChain() signalledCallback = true responseCallback.onResponse(this @RealCall , response) } catch (e: IOException) { if (signalledCallback) { Platform.get ().log(INFO, "Callback failure for ${toLoggableString()} " , e) } else { responseCallback.onFailure(this @RealCall , e) } } finally { client.dispatcher.finished(this ) } } }
这块调用了一个getResponseWithInterceptorChain()方法,并返回了response,并将它返回给了responseCallback.onResponse()方法。如果失败了就将结果返回给responseCallback.onFailure()方法。最后调用client.dispatcher.finished()方法。
2.1.3 Interceptor拦截器
2.1.3.1 getResponseWithInterceptorChain()方法
首先我们看看getResponseWithInterceptorChain()方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
@Throws(IOException::class) fun getResponseWithInterceptorChain () : Response { val interceptors = mutableListOf<Interceptor>() interceptors += client.interceptors interceptors += RetryAndFollowUpInterceptor(client) interceptors += BridgeInterceptor(client.cookieJar) interceptors += CacheInterceptor(client.cache) interceptors += ConnectInterceptor if (!forWebSocket) { interceptors += client.networkInterceptors } interceptors += CallServerInterceptor(forWebSocket) val chain = RealInterceptorChain(interceptors, transmitter, null , 0 , originalRequest, this , client.connectTimeoutMillis, client.readTimeoutMillis, client.writeTimeoutMillis) var calledNoMoreExchanges = false try { val response = chain.proceed(originalRequest) if (transmitter.isCanceled) { response.closeQuietly() throw IOException("Canceled" ) } return response } catch (e: IOException) { calledNoMoreExchanges = true throw transmitter.noMoreExchanges(e) as Throwable } finally { if (!calledNoMoreExchanges) { transmitter.noMoreExchanges(null ) } } }
首先就创建了一大堆的连接器并添加到interceptors集合中。然后创建了一个RealInterceptorChain对象,并调用了他的proceed()方法,接着主要目的就是讲proceed()返回的response给返回去。 那我们就来看看RealInterceptorChain的proceed()方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
override fun proceed (request: Request ) : Response { return proceed(request, transmitter, exchange) } @Throws(IOException::class) fun proceed (request: Request , transmitter: Transmitter , exchange: Exchange ?) : Response { if (index >= interceptors.size) throw AssertionError() calls++ check(this .exchange == null || this .exchange.connection()!!.supportsUrl(request.url)) { "network interceptor ${interceptors[index - 1 ]} must retain the same host and port" } check(this .exchange == null || calls <= 1 ) { "network interceptor ${interceptors[index - 1 ]} must call proceed() exactly once" } val next = RealInterceptorChain(interceptors, transmitter, exchange, index + 1 , request, call, connectTimeout, readTimeout, writeTimeout) val interceptor = interceptors[index] @Suppress("USELESS_ELVIS" ) val response = interceptor.intercept(next) ?: throw NullPointerException( "interceptor $interceptor returned null" ) check(exchange == null || index + 1 >= interceptors.size || next.calls == 1 ) { "network interceptor $interceptor must call proceed() exactly once" } check(response.body != null ) { "interceptor $interceptor returned a response with no body" } return response }
首先就是一个index,index是RealInterceptorChain构造器中传入的参数,她是第四个参数,所以我们看getResponseWithInterceptorChain()方法中创建RealInterceptorChain对象时构造器的第四个传入的值为0。然后判断index的值是不是大于interceptors的大小,如果大于就抛异常,否则就继续一顿检查!!!然后再创建RealInterceptorChain对象,此时创建的对象传入的index为此时的index+1,然后再调用interceptor的intercept()方法,并返回response。interceptor的intercept()作用是当存在多个拦截器时都会在上面代码注释1处阻塞,并等待下一个拦截器的调用返回。
2.1.3.2 Interceptor源码
那现在我们再来讲几个重要的拦截器吧。OkHttp中Interceptor的实现类有:
ConnectInterceptor:连接拦截器。
CallServerInterceptor:请求服务器拦截器
CacheInterceptor:缓存拦截器
BridgeInterceptor:桥梁拦截器。
其中较为重要的就是ConnectInterceptor和CallServerInterceptor,那我们来看看这两个。
2.1.3.2.1 ConnectInterceptor
这个类主要用来实现网络请求连接。我们来看下他的intercept方法:
1 2 3 4 5 6 7 8 9 10 11 12
@Throws(IOException::class) override fun intercept (chain: Interceptor .Chain ) : Response { val realChain = chain as RealInterceptorChain val request = realChain.request() val transmitter = realChain.transmitter() val doExtensiveHealthChecks = request.method != "GET" val exchange = transmitter.newExchange(chain, doExtensiveHealthChecks) return realChain.proceed(request, transmitter, exchange) }
这个方法先将传入的chain对象造型成了RealInterceptorChain的对象,这个类我们在上面提到过,然后调用他的response()和transmitter()方法,分别得到当前chain的response和transmitter。然后执行了request的method()方法,判断request的类型是不是GET,如果是doExtensiveHealthChecks就为false,否则为true,接着把doExtensiveHealthChecks传入transmitter的newExchange()方法中去,这个方法我们等会再说,然后再调用了proceed()方法,这个方法我们在上面说过。
2.1.3.2.2 CallServerInterceptor
这个类是网络请求的本质。它的intercept方法源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
@Throws(IOException::class) override fun intercept (chain: Interceptor .Chain ) : Response { val realChain = chain as RealInterceptorChain val exchange = realChain.exchange() val request = realChain.request() val requestBody = request.body val sentRequestMillis = System.currentTimeMillis() exchange.writeRequestHeaders(request) var responseHeadersStarted = false var responseBuilder: Response.Builder? = null if (HttpMethod.permitsRequestBody(request.method) && requestBody != null ) { if ("100-continue" .equals(request.header("Expect" ), ignoreCase = true )) { exchange.flushRequest() responseHeadersStarted = true exchange.responseHeadersStart() responseBuilder = exchange.readResponseHeaders(true ) } if (responseBuilder == null ) { if (requestBody.isDuplex()) { exchange.flushRequest() val bufferedRequestBody = exchange.createRequestBody(request, true ).buffer() requestBody.writeTo(bufferedRequestBody) } else { val bufferedRequestBody = exchange.createRequestBody(request, false ).buffer() requestBody.writeTo(bufferedRequestBody) bufferedRequestBody.close() } } else { exchange.noRequestBody() if (!exchange.connection()!!.isMultiplexed) { exchange.noNewExchangesOnConnection() } } } else { exchange.noRequestBody() } if (requestBody == null || !requestBody.isDuplex()) { exchange.finishRequest() } if (!responseHeadersStarted) { exchange.responseHeadersStart() } if (responseBuilder == null ) { responseBuilder = exchange.readResponseHeaders(false )!! } var response = responseBuilder .request(request) .handshake(exchange.connection()!!.handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build() var code = response.code if (code == 100 ) { response = exchange.readResponseHeaders(false )!! .request(request) .handshake(exchange.connection()!!.handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build() code = response.code } exchange.responseHeadersEnd(response) response = if (forWebSocket && code == 101 ) { response.newBuilder() .body(EMPTY_RESPONSE) .build() } else { response.newBuilder() .body(exchange.openResponseBody(response)) .build() } if ("close" .equals(response.request.header("Connection" ), ignoreCase = true ) || "close" .equals(response.header("Connection" ), ignoreCase = true )) { exchange.noNewExchangesOnConnection() } if ((code == 204 || code == 205 ) && response.body?.contentLength() ?: -1L > 0L ) { throw ProtocolException( "HTTP $code had non-zero Content-Length: ${response.body?.contentLength()} " ) } return response }
具体过程可以看代码中的注释,它主要是向服务器发送请求数据和接受服务器返回的数据。
2.1.4 Transmiter
Transmitter类是OkHttp的应用层和网络层的一个桥梁类。
我们先来看看该类的初始化:
1 2 3 4 5 6 7 8 9 10 11 12 13
class Transmitter ( private val client: OkHttpClient, private val call: Call ) { private val connectionPool: RealConnectionPool = client.connectionPool.delegate private val eventListener: EventListener = client.eventListenerFactory.create(call) private val timeout = object : AsyncTimeout() { override fun timedOut () { cancel() } }.apply { timeout(client.callTimeoutMillis.toLong(), MILLISECONDS) }
Transmitter主要的一些成员变量就这些,首先构造器中传入了两个参数,一个OkHttpClient,一个Call。然后又创建了一个连接池connectionPool,还有一个监听器,我们可以通过扩展这个类来监听程序的HTTP的调用数量、大小和持续时间。
2.1.5 RealConnection
我们先看看他的一些属性:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
class RealConnection ( val connectionPool: RealConnectionPool, private val route: Route ) : Http2Connection.Listener(), Connection { private var rawSocket: Socket? = null private var socket: Socket? = null private var handshake: Handshake? = null private var protocol: Protocol? = null private var http2Connection: Http2Connection? = null private var source: BufferedSource? = null private var sink: BufferedSink? = null var noNewExchanges = false internal var routeFailureCount = 0 internal var successCount = 0 private var refusedStreamCount = 0 private var allocationLimit = 1
接下来我们看看他的connect()方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
fun connect ( connectTimeout: Int , readTimeout: Int , writeTimeout: Int , pingIntervalMillis: Int , connectionRetryEnabled: Boolean , call: Call , eventListener: EventListener ) { check(protocol == null ) { "already connected" } var routeException: RouteException? = null val connectionSpecs = route.address.connectionSpecs val connectionSpecSelector = ConnectionSpecSelector(connectionSpecs) if (route.address.sslSocketFactory == null ) { if (ConnectionSpec.CLEARTEXT !in connectionSpecs) { throw RouteException(UnknownServiceException( "CLEARTEXT communication not enabled for client" )) } val host = route.address.url.host if (!Platform.get ().isCleartextTrafficPermitted(host)) { throw RouteException(UnknownServiceException( "CLEARTEXT communication to $host not permitted by network security policy" )) } } else { if (Protocol.H2_PRIOR_KNOWLEDGE in route.address.protocols) { throw RouteException(UnknownServiceException( "H2_PRIOR_KNOWLEDGE cannot be used with HTTPS" )) } } while (true ) { try { if (route.requiresTunnel()) { connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener) if (rawSocket == null ) { break } } else { connectSocket(connectTimeout, readTimeout, call, eventListener) } establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener) eventListener.connectEnd(call, route.socketAddress, route.proxy, protocol) break } catch (e: IOException) { socket?.closeQuietly() rawSocket?.closeQuietly() socket = null rawSocket = null source = null sink = null handshake = null protocol = null http2Connection = null eventListener.connectFailed(call, route.socketAddress, route.proxy, null , e) if (routeException == null ) { routeException = RouteException(e) } else { routeException.addConnectException(e) } if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) { throw routeException } } } if (route.requiresTunnel() && rawSocket == null ) { throw RouteException(ProtocolException( "Too many tunnel connections attempted: $MAX_TUNNEL_ATTEMPTS " )) } val http2Connection = this .http2Connection if (http2Connection != null ) { synchronized(connectionPool) { allocationLimit = http2Connection.maxConcurrentStreams() } } }
首先检查是否已经建立连接,如果已经建立就抛异常,没有的话就继续。接着就得到了ConnectionSpecs,然后根据他建立了一个connectionSpecSelector集合。接着判断是不是安全连接,也就是ssl连接,如果不是的话就判断了一些属性,先确定是不是明文然后再确定主机能不能接受明文操作。接着就开始连接,判断是不是要进行隧道通信,如果是就调用connectTunnel()建立隧道通信,如果不是就调用connectSocket()建立普通的通信。然后通过establishProtocol()建立协议。如果是HTTP/2就设置相关属性。
然后我们就来看看他具体如何实现的,先看看connectSocket()方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
@Throws(IOException::class) private fun connectSocket ( connectTimeout: Int , readTimeout: Int , call: Call , eventListener: EventListener ) { val proxy = route.proxy val address = route.address val rawSocket = when (proxy.type()) { Proxy.Type.DIRECT, Proxy.Type.HTTP -> address.socketFactory.createSocket()!! else -> Socket(proxy) } this .rawSocket = rawSocket eventListener.connectStart(call, route.socketAddress, proxy) rawSocket.soTimeout = readTimeout try { Platform.get ().connectSocket(rawSocket, route.socketAddress, connectTimeout) } catch (e: ConnectException) { throw ConnectException("Failed to connect to ${route.socketAddress} " ).apply { initCause(e) } } try { source = rawSocket.source().buffer() sink = rawSocket.sink().buffer() } catch (npe: NullPointerException) { if (npe.message == NPE_THROW_WITH_NULL) { throw IOException(npe) } } }
首先先判断连接类型,如果是直连或者HTTP连接就直连,否则的话走Socket代理,然后通过eventListener.connectStart()方法创建连接,再设定超时->完成连接->创建用于I/O的source和sink。
我们接着再来看connectTunnel()方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
@Throws(IOException::class) private fun connectTunnel ( connectTimeout: Int , readTimeout: Int , writeTimeout: Int , call: Call , eventListener: EventListener ) { var tunnelRequest: Request = createTunnelRequest() val url = tunnelRequest.url for (i in 0 until MAX_TUNNEL_ATTEMPTS) { connectSocket(connectTimeout, readTimeout, call, eventListener) tunnelRequest = createTunnel(readTimeout, writeTimeout, tunnelRequest, url) ?: break rawSocket?.closeQuietly() rawSocket = null sink = null source = null eventListener.connectEnd(call, route.socketAddress, route.proxy, null ) } }
大体就是先创建隧道请求,然后建立socket连接,再发送请求建立隧道。
3. 请求流程图
那我们最后来总结下
3.1 同步请求是如何操作的?
3.2 异步请求是如何操作的?