OkHttp介绍
OkHttp是目前Android上最常用的网络请求框架,由Square开源。从Android 4.4开始,Google开始用OKHttp替换源代码中HttpURLConnection的底层实现。同时,流行的Retrofit框架的底层实现也使用了OKHttp。
源代码门户
优势:
支持Http1、Http2、Quic、WebSocket连接池,复用底层TCP(socket),减少数据流量,自动缓解重复失败的网络请求。自动重试主机方向。
OkHttp使用流程图
用户在使用OkHttp发起请求时,至少具有三个角色:OkHttpClient、Request、Call。要创建OkHttpClient和Request,可以使用它提供的Builder(构建器模式)。 Call是准备执行的请求,将请求传递给OkHttpClient后返回。
构建器模式:将复杂构建与其表示分离,允许相同的构建过程创建不同的表示。实例化OKHttpClient 和Request 时需要设置的属性太多,而且开发者的请求组合一直在变化,因此使用构建器模式使得配置后很难关心该类的内部细节。为了它。构建器帮助演示对象逐步初始化
大家应该记住这个流程图,对请求逻辑有一个大概的印象。
OkHttp基本使用
//1.创建客户端
OkHttpClient 客户端=new OkHttpClient().newBuilder()
.cookieJar(CookieJar.NO_COOKIES)
.callTimeout(10000, TimeUnit.MILLISECONDS)
。建造();
//2. 创建请求
请求request=new Request.Builder()
.url(\’http://10.34.12.156:68080/admin-api\’)
.addHeader(\’内容类型\’, \’application/json\’)
。获得();
。建造();
//3. 构造调用对象
调用call=client.newCall(request);
//4.1 调用调用对象的同步请求方法
Response response=call.execute(); //返回的响应参数存储在响应对象中
//4.2 调用调用对象的异步请求方法
call.enqueue(new Callback() {
@覆盖
公共无效onFailure(@NonNull Call调用,@NonNull IOException e){
Log.d(TAG, \’onFailure: \’);//失败回调
}
@覆盖
公共无效onResponse(@NonNull调用call,@NonNull响应response){
Log.d(TAG, \’onResponse: \’); //成功回调
}
});
第一步是使用构建器模式创建一个OkHttpClient 对象。
第二步,创建Request请求,添加URL、请求头等。这里也使用了构建器模式。
第三步,创建Call对象并发起网络请求。这也是一个需要具体分析的领域。
这里,我们通过客户端对象的newCakk方法获取Call对象。那么newCall方法是什么呢?
查看源码,我们看到newCall方法返回一个Call对象,该对象是通过RealCall方法返回的。此时,我们需要回过头来看看Call 到底是怎么回事。部分Call源码如下。
接口调用: 可克隆{
/** 返回发起此调用的原始请求。 */
有趣的请求(): 请求
/**
* 立即调用请求并阻塞,直到可以处理响应或发生错误。
*
* 调用者必须关闭[Response]以避免资源泄漏。这导致
* 底层[ResponseBody]。
*
*“”
* //确保响应(和底层响应主体)已关闭
*尝试(响应响应=client.newCall(请求).execute()){
*.
* }
*“”
*
* 调用者可以使用响应的[Response.body] 方法读取响应正文。
* 资源泄漏的调用者必须关闭[ResponseBody]或响应。
*
* 注意传输层成功(接收HTTP响应码、headers、body)不是成功。
* 始终指示应用层成功: `response` 可能仍指示不令人满意的HTTP
*响应代码,如404 或500。
*
* @throws IOException 如果请求由于取消或连接而无法执行
* 问题或超时。交换过程中可能会出现网络故障。
* 远程服务器在失败发生之前接受了请求。
* 如果调用已经执行,则抛出IllegalStateException。
*/
@Throws(IOException: 类)
funexecute(): 响应
/**
* 安排请求在将来的某个时间运行。
*
* [dispatcher][OkHttpClient.dispatcher] 通常定义请求何时执行:
* 除非当前正在运行其他请求,否则立即执行。
*
* 该客户端稍后将通过HTTP 响应或失败回调`responseCallback`。
* 例外。
*
* 如果调用已经执行,则抛出IllegalStateException。
*/
有趣的排队(responseCallback:回调)
/** 如果可能的话取消请求。 */已经完成的请求无法取消。
有趣的取消()
/**
* 如果此调用是[已执行][已执行]或[已入队][已入队],则返回true。
* 多次执行调用会导致错误。
*/
fun isExecuted(): 布尔值
fun isCanceled(): 布尔值
/**
* 返回call: 的DNS 解析、连接和请求写入的超时。
* 正文,服务器处理,以及调用是否需要重定向或读取响应正文。
* 所有重试必须在一个超时时间内完成。
*
* 使用[OkHttpClient.Builder.callTimeout] 设置客户端的默认超时。
*/
有趣的超时(): 超时
/**
* 创建一个与此调用相同的新调用。该调用也可以排队或执行。
* 已经完成了。
*/
公共覆盖fun clone(): 调用
有趣的界面工厂{
好玩的newCall(request: request): call
}
}
很容易看出Call是一个接口,提供了很多方法来实现。
在Call接口中,Factory接口定义了方法。
好玩的newCall(request: request): call
该方法将Request对象作为参数并返回Call对象。具体的实现是由实现Factory接口的类提供的。 OkHttClient也实现了这个接口。
因此,这就是为什么客户端可以调用newCall方法并返回一个Call对象。我们回到主题吧。前面的client.call最终通过实现newCall方法获得了Call对象,该方法通过RealCall方法返回了Call对象。所以你需要看看RealCall 做了什么。
RealCall 源代码的一部分如下所示。可以看到RealCall实际上实现了Call接口。所以RealCall实际上可以返回一个Call对象。同时,我们还实现了execute(等)的抽象方法。同步请求)和排队(异步请求)。
类RealCall(
val client: OKHttpClient,
/** 应用程序的原始请求,没有重定向或授权标头*/
valoriginalRequest: 请求,
val forWebSocket: 布尔值
) : 电话{
超越乐趣。执行():响应{
check(execused.compareAndSet(false, true)) { \’已经执行\’ }
超时.enter()
调用开始()
尝试{
client.dispatcher.execute(this)
返回getResponseWithInterceptorChain()
} 最后{
client.dispatcher.finished(这个)
}
}
覆盖乐趣入队(responseCallback:回调){
check(execused.compareAndSet(false, true)) { \’已经执行\’ }
调用开始()
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
@Throws(IOException: 类)
内心乐趣getResponseWithInterceptorChain(): 响应{
//构建完整的拦截器堆栈。
val 拦截器=mutableListOfInterceptor()
拦截器+=client.interceptors
拦截器+=RetryAndFollowUpInterceptor(客户端)
拦截器+=BridgeInterceptor(client.cookieJar)
拦截器+=CacheInterceptor(client.cache)
拦截器+=ConnectInterceptor
如果(!forWebSocket){
拦截器+=client.networkInterceptors
}
Interceptor +=CallServerInterceptor(对于WebSocket)
val链=RealInterceptorChain(
电话=这个,
拦截器=拦截器,
索引=0,
替换=空,
请求=原始请求,
connectTimeoutMillis=client.connectTimeoutMillis,
readTimeoutMillis=client.readTimeoutMillis,
writeTimeoutMillis=客户端.writeTimeoutMillis
)
NoMoreExchanges 称为var=false
尝试{
val 响应=chain.proceed(originalRequest)
如果(已取消()){
响应.closeQuietly()
抛出IOException(\’取消\’)
}
返回响应
} catch(e: IOException) {
称为NoMoreExchanges=true
将noMoreExchanges(e) 作为Throwable 抛出
} 最后{
如果(!namedNoMoreExchanges){
没有更多交换(空)
}
}
}
}
val call=client.newCall(请求)
现在,我们也来分析一下我们是如何获取call对象的。现在我们来分析一下同步和异步请求是如何执行的。这里,我们首先分析一下异步请求。
异步请求
//4.2 调用调用对象的异步请求方法
call.enqueue(对象: 回调{
重写fun onFailure(call: Call, e: IOException) {
Log.d(\’a\’, \’onFailure:\’) //失败回调
}
覆盖fun onResponse(call: 呼叫, response: 响应) {
Log.d(\’b\’, \’onResponse:\’) //成功回调
}
})
正如你在前面的源代码中看到的,RealCall对象也实现了Call接口并重写了内部方法。那么enqueue方法到底是如何执行的呢?
覆盖fun enqueue(responseCallback: 回调) {
check(execused.compareAndSet(false, true)) { \’已经执行\’ }
调用开始()
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
首先,使用检查来确定enqueue 方法是否已执行过一次。因为call允许一个请求只能调用一次,使用后不能再次调用,所以我们使用原子性来判断操作并检查enqueue方法是否已经执行。这里之前Java版本的OkHttp源码并没有使用原子操作,而是使用变量来判断是否有请求,所以这里有一个实际的优化。
然后我发现这个方法叫做callStart。该方法有网络连接开始时、DNS域名查询开始时等被重写的方法。我们在这里不提供任何批判性分析。如果您有兴趣,无论源代码如何,请参考下面的源代码。
这是重点: client.dispatcher.enqueue(AsyncCall(responseCallback)) 现在让我们看看AsyncCall 是什么。
AsyncCall是RealCall的内部类,它实现了Runnable接口,主要用于在线程池上执行run()方法。
内部内部类AsyncCall(
私有val responseCallback: 回调
) : 可执行文件{
@Volatile var callPerHost=AtomicInteger(0)
私人套装
有趣的重用CallsPerHostFrom(other: AsyncCall) {
//callPerHost 表示同一主机的连接数
//连接数必须小于5
this.callsPerHost=other.callsPerHost
}
val host: 字符串
get()=原始请求.url.host
val request: 请求
get()=原始请求
val call: RealCall
get()=this@RealCall
//将asyncCall添加到线程池执行的方法
有趣的executeOn(executorService: ExecutorService){
client.dispatcher.assertThreadDoesntHoldLock()
var 成功=false
尝试{
//线程池执行当前AsyncCall对象的run方法
执行器服务.execute(this)
成功=真
} catch (e: RejectedExecutionException) {
val ioException=InterruptedIOException(\’执行者被拒绝\’)
ioException.initCause(e)
没有更多交换(ioException)
responseCallback.onFailure(this@RealCall, ioException)
} 最后{
如果(!成功){
//整理
//其实内部调用的是promoteAndExecute()
client.dispatcher.finished(这个)
}
}
}
重写fun run() {
threadName(\’OkHttp ${redactedUrl()}\’) {
var signaledCallback=false
超时.enter()
尝试{
//getResponseWithInterceptorChain() 获取请求的响应结果
val 响应=getResponseWithInterceptorChain()
信号回调=true
//如果请求成功则回调
responseCallback.onResponse(this@RealCall, 响应)
} catch(e: IOException) {
if (signalledCallback) {
Platform.get().log(\’${toLoggableString()}回调失败\’, Platform.INFO, e)
} 除此之外{
responseCallback.onFailure(this@RealCall, e)
}
} catch (t: 可抛出) {
取消()
if (!signalledCallback) {
val CanceledException=IOException(\’因$t 而取消\’)
Exception.addSuppressed(t) 已取消
//请求失败时回调
responseCallback.onFailure(this@RealCall, cancelException)
}
扔
} 最后{
//做收尾工作
//相比于同步请求终止方式,这个更重要
client.dispatcher.finished(这个)
}
}
}
}
可以看到,AsyncCall实现了Runnable方法,并实现了run方法。 Run 方法中的getRespo
nseWithInterceptorChain就是我们后续的拦截器的调用了。
现在我们再去分析client.dispatcher.enqueue(AsyncCall(responseCallback)),我们可以看看dispatcher的源码,这块源码里面的注释已经解释的很清楚了。主要逻辑就是,对AsyncCall对象进行挑选,将它们放入runningAsyncCalls中。
Dispatcher分发器主要功能:内部维护队列与线程池,完成请求调配。
//异步请求同时存在的最大请求
private int maxRequests = 64;
//异步请求同一域名同时存在的最大请求
private int maxRequestsPerHost = 5;
//闲置任务(没有请求时可执行一些任务,由使用者设置)
private @Nullable Runnable idleCallback;
//异步请求使用的线程池
private @Nullable ExecutorService executorService;
//异步请求等待执行队列
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
//异步请求正在执行队列
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
//同步请求正在执行队列
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
// Dispatcher.kt
// 准备执行的异步请求队列
private val readyAsyncCalls = ArrayDeque<AsyncCall>()
// 正在执行的异步请求队列
private val runningAsyncCalls = ArrayDeque<AsyncCall>()
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
// 加当前asyncCall加到准备执行的异步请求队列中
readyAsyncCalls.add(call)
if (!call.call.forWebSocket) {
// 这里是得到连接同一个 host 的请求数
val existingCall = findExistingCallWithHost(call.host)
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
// dispatcher进行分发call任务的方法
promoteAndExecute()
}
// 关键方法,dispatcher进行任务分发的方法
// 进行收尾工作时,也是调用的它
private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()
// 需要开始执行的任务集合
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
// 迭代等待执行异步请求
while (i.hasNext()) {
val asyncCall = i.next()
// 正在执行异步请求的总任务数不能大于64个
// 否则直接退出这个循环,不再将请求加到异步请求队列中
if (runningAsyncCalls.size >= this.maxRequests) break
// 同一个host的请求数不能大于5
// 否则直接跳过此call对象的添加,去遍历下一个asyncCall对象
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue
i.remove()
// 如果拿到了符合条件的asyncCall对象,就将其callPerHost值加1
// callPerHost代表了连接同一个host的数量
asyncCall.callsPerHost.incrementAndGet()
// 加到需要开始执行的任务集合中
executableCalls.add(asyncCall)
// 将当前call加到正在执行的异步队列当中
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}
for (i in 0 until executableCalls.size) {
// 遍历每一个集合中的asyncCall对象
// 将其添加到线程池中,执行它的run方法
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService)
}
return isRunning
}
现在我们通过 Dispatcher 将 AsyncCall 对象通过挑选,加到了线程池中。挑选的限制有两个:
1.当前执行的总请求数要小于64个。
2.对于连接的同一个host请求,要保证数量小于5。
现在,我们再回头看看将 AsyncCall 对象加到线程池后的一些细节吧!
// Dispatcher.kt
// 将asyncCall添加到线程池中去执行的方法
fun executeOn(executorService: ExecutorService) {
client.dispatcher.assertThreadDoesntHoldLock()
var success = false
try {
// 这里是之前自定义了创建了一个ExecutorService
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
val ioException = InterruptedIOException(\”executor rejected\”)
ioException.initCause(e)
noMoreExchanges(ioException)
responseCallback.onFailure(this@RealCall, ioException)
} finally {
if (!success) {
// 这里也是会执行收尾工作
client.dispatcher.finished(this)
}
}
}
@get:JvmName(\”executorService\”) val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
// !!这里的corePoolSize是 0
// !!阻塞队列是 SynchronousQueue
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory(\”$okHttpName Dispatcher\”, false))
}
return executorServiceOrNull!!
}
我们先来看 executeOn() 方法,它的主要工作就是执行添加到线程池的 AsyncCall 对象的 run() 方法,去进行网络请求。其次我们目光移动到 finally 语句块,会发现每次执行完 run() 方法后,即完成网络请求后,都会去执行这个 finished() 方法。前面讲到过,内部其实是再次调用了 promoteAndExecute() 方法。那这是为什么呢?
还记得到我们从准备执行的异步队列中挑选一些 AsyncCall 对象拿到线程池中执行吗?如果记得,那你是否还记得我们是有挑选条件的,正因如此,可能在准备执行的异步请求队列中会有一些 AsyncCall 对象不满足条件仍然留在队列里!那我们难道最后就不执行这些网络请求了吗?当然不是!原来每完成一次网络请求就会再次触发 Dispatcher 去分发 AsyncCall 对象!原来如此。
private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
i.remove()
asyncCall.callsPerHost.incrementAndGet()
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService)
}
return isRunning
}
可以看到,这里会对我们准备队列里面的call进行筛选,此时如果满足条件,我们的call就会被筛选出来到运行队列去执行。
然后我们再来看看这里用到的线程池是一个什么样的线程池。在上面我贴出来的代码中可以看到,这个线程池的 corePoolSize 是 0,BlockingQueue 是 SynchronousQueue,这样构建出来的线程池有什么特殊之处吗?
熟悉线程池的同学都应该知道,当任务数超过了 corePoolSize 就会将其加到阻塞队列当中。也就是说这些任务不会立马执行,而我们的网络请求可不想被阻塞着。
因此这里的 corePoolSize 就设置成了 0。BlockingQueue 设置成 SynchronousQueue 也是类似道理,SynchronousQueue 是不储存元素的,只要提交的任务数小于最大线程数就会立刻新起线程去执行任务。
好了,我们的异步请求暂时先讲到这块,我们最终获得response是在我们RealCall对象重写的run方法里面去拿到的,还记得它的run方法里面的
val response = getResponseWithInterceptorChain()
这块就是拦截器的内容了,这里先不说,我们先继续把同步的情况说完。
同步请求
// 调用call对象的同步请求方法
val response = call.execute()
继续追溯源码,查看execute方法。
override fun execute(): Response {
check(executed.compareAndSet(false, true)) { \”Already Executed\” }
timeout.enter()
callStart()
try {
client.dispatcher.executed(this)
return getResponseWithInterceptorChain()
} finally {
client.dispatcher.finished(this)
}
}
这块的check、callStart基本都是和异步请求一样的逻辑。由于是同步队列,无需考虑其他的,将任务加到队列里就好。
override fun execute(): Response {
// 一个call对象只能执行一次execute方法
// 这里用CAS思想进行比较,可以提高效率
check(executed.compareAndSet(false, true)) { \”Already Executed\” }
timeout.enter()
// 这里主要是个监听器,表示开始进行网络请求了
callStart()
// 重点关注这块
try {
// 通过分发器进行任务分发
// 其实这里还体现不出分发器的效果,仅仅是将当前请求加入到一个同步队列当中
client.dispatcher.executed(this)
// 通过 getResponseWithInterceptorChain() 获得相应结果
return getResponseWithInterceptorChain()
} finally {
// 完成一些收尾工作,在同步请求中,几乎没什么用
client.dispatcher.finished(this)
}
}
总结一下整个 okhttp 网络请求的整个过程。
首先通过我们通过 构造者 的方式构建好了 OkHttpClient 和 Request 对象,然后调用 OkHttpClient 对象的 newCall() 方法得到一个 RealCall 对象,最后再调用其 execute() 或者 enqueue() 方法进行同步或者异步请求。
然后如果是同步请求,Dispatacher 分发器去只是简单的将其加入到正在执行的同步请求队列中做一个标记,如果是异步请求就会根据 两个条件 去筛选合适的请求,并将其发送给一个特定的线程池中去进行网络请求,最后通过 getResponseWithInterceptorChain() 得到最终结果。
你回过头再看看,是不是和上面的使用流程图基本上一样的呢?
拦截器
getResponseWithInterceptorChain是怎么拿到我们最终的结果呢?这里我们先学一下相关的前置知识。
首先,我们需要了解什么是责任链模式。
对象行为型模式,为请求创建了一个接收者对象的链,在处理请求的时候执行过滤(各司其职)。 责任链上的处理者负责处理请求,客户只需要将请求发送到责任链即可,无须关心请求的处理细节和请求的传递,所以职责链将请求的发送者和请求的处理者解耦了
如果你需要点外卖,你只需要去美团下单就好了,不需要做其他事情,美味就来了,干净又卫生。
这里先我们介绍一下我们在网络请求中涉及的五大拦截器。
RetryAndFollowUpInterceptor:请求失败自动重试,如果 DNS 设置了多个ip地址会自动重试其余ip地址。BridgeInterceptor:会补全我们请求中的请求头,例如Host,Cookie,Accept-Encoding等。CacheInterceptor:会选择性的将响应结果进行保存,以便下次直接读取,不再需要再向服务器索要数据。ConnectInterceptor:建立连接并得到对应的socket;管理连接池,从中存取连接,以便达到连接复用的目的。CallServerInterceptor:与服务器建立连接,具体进行网络请求,并将结果逐层返回的地方。
我们的拦截器就是使用了这是模式,由五大拦截器一层层分发下去,最后得到结果再一层层返回上来。
现在,我们开始正式分析, getResponseWithInterceptorChain。
// 五大拦截器的起始入口
@Throws(IOException::class)
internal fun getResponseWithInterceptorChain(): Response {
// 用一个集合保存所有的拦截器
val interceptors = mutableListOf<Interceptor>()
// 这个interceptor就是我们自己可以加的第一个拦截器
// 因为位于所有拦截器的第一个,与我们的应用直接相连
// 因此这个拦截器又被称为 Application Interceptor
interceptors += client.interceptors
// 重试重定向拦截器
interceptors += RetryAndFollowUpInterceptor(client)
// 桥接拦截器
interceptors += BridgeInterceptor(client.cookieJar)
// 缓存拦截器
interceptors += CacheInterceptor(client.cache)
// 连接拦截器
interceptors += ConnectInterceptor
if (!forWebSocket) {
// 这个interceptor也是我们自己可以加的一个拦截器
// 因为位于真正请求返回结果的拦截器前面,可以拿到服务器返回的最原始的结果
// 因此这个拦截器又被称为 Network Interceptor
interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket)
// 构建RealInterceptorChain对象,我们正是通过此对象将请求逐层往下传递的
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)
var calledNoMoreExchanges = false
try {
// 调用RealInterceptorChain的proceed()方法,将请求向下一个连接器传递
val response = chain.proceed(originalRequest)
if (isCanceled()) {
response.closeQuietly()
throw IOException(\”Canceled\”)
}
// 放回响应结果
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
noMoreExchanges(null)
}
}
}
从这个方法中我们大概可以总结出,它将所有的拦截器包括用户自定义的拦截器全部通过一个集合保存了下来,然后构建出了 RealInterceptorChain 对象,并调用其 proceed() 方法开始了拦截器逐层分发工作。
那么它是怎么做到逐层分发的呢?其实很简单,每一个拦截器中都会通过 proceed() 方法再构建一个 RealInterceptorChain 对象,然后调用 intercpt去执行下个拦截器中的任务,如此循环,最终走到最后一个拦截器后退出。
// RealInterceptorChain.kt —–> 实现了 Chain 接口
override fun proceed(request: Request): Response {
// 检查是否走完了所有的拦截器,是则退出
check(index < interceptors.size
…
// 这个方法就是再次构建了 RealInterceptorChain 对象 ==> next
// 去执行下个拦截器中的任务
val next = copy(index = index + 1, request = request)// 这个方法内部就一行代码 new RealInterceptorChain()
val interceptor = interceptors[index]
@Suppress(\”USELESS_ELVIS\”)
// 通过调用intercept(next)去执行下一个拦截器中的任务
val response = interceptor.intercept(next) ?: throw NullPointerException(
\”interceptor $interceptor returned null\”)
…
// 将结果放回到上一个拦截器中
return response
}
以上我们搞清楚了拦截器是如何一步一步往下传递任务,并逐层往上返回结果的,现在我们来具体看看每个拦截器都做了什么事情。
RetryAndFollowUpInterceptor拦截器
// RetryAndFollowUpInterceptor.kt
// 所有拦截求都实现了 Interceptor 接口
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
var request = chain.request
val call = realChain.call
var followUpCount = 0
var priorResponse: Response? = null
var newExchangeFinder = true
var recoveredFailures = listOf<IOException>()
// 这是个死循环,意思就是如果请求失败就需要一直重试,直到主动退出循环(followUpCount>20)
while (true) {
// ExchangeFinder: 获取连接 —> ConnectInterceptor中使用
call.enterNetworkInterceptorExchange(request, newExchangeFinder)
// 响应结果
var response: Response
var closeActiveExchange = true
try {
if (call.isCanceled()) {
throw IOException(\”Canceled\”)
}
try {
// 调用下一个拦截器,即 BridgeInterceptor
// 整个请求可能会失败,需要捕获然后重试重定向,因此有一个try catch
response = realChain.proceed(request)
newExchangeFinder = true
} catch (e: RouteException) { //1.进行重试
// 1.1 路线异常,检查是否需要重试
if (!recover(e.lastConnectException, call, request, requestSendStarted = false)) {
throw e.firstConnectException.withSuppressed(recoveredFailures)
} else {
recoveredFailures += e.firstConnectException
}
newExchangeFinder = false
// 失败继续重试
continue
} catch (e: IOException) {
// 1.2 IO异常 HTTP2才会有ConnectionShutdownException 代表连接中断
//如果是因为IO异常,那么requestSendStarted=true (若是HTTP2的连接中断异常仍然为false)
if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) {
throw e.withSuppressed(recoveredFailures)
} else {
recoveredFailures += e
}
newExchangeFinder = false
// 失败继续重试
continue
}
// priorResponse:上一次请求的响应
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build()
}
val exchange = call.interceptorScopedExchange
// 2.根据返回的response进行重定向,构建新的Request对象
val followUp = followUpRequest(response, exchange)
// 2.1 followUp为空,代表没有重定向,直接返回结果response
if (followUp == null) {
if (exchange != null && exchange.isDuplex) {
call.timeoutEarlyExit()
}
closeActiveExchange = false
return response
}
// 2.2 followUp不为空,但是body中设置了只能请求一次(默认),返回重定向后的结果response
val followUpBody = followUp.body
if (followUpBody != null && followUpBody.isOneShot()) {
closeActiveExchange = false
return response
}
// 重试次数大于20次,抛出异常
if (++followUpCount > MAX_FOLLOW_UPS) {
throw ProtocolException(\”Too many follow-up requests: $followUpCount\”)
}
// 将之前重定向后的Request对象赋值给request进行重新请求
request = followUp
priorResponse = response
} finally {
call.exitNetworkInterceptorExchange(closeActiveExchange)
}
}
}
简单来说,RetryAndFollowUpInterceptor拦截器帮我们干了两件事。第一是重试,第二是重定向。
✅ 我们先来看看什么情况下它会进行重试。
// RetryAndFollowUpInterceptor.kt
// 这个方法就是来判断当前请求是否需要重试的
private fun recover(
e: IOException,
call: RealCall,
userRequest: Request,
requestSendStarted: Boolean
): Boolean {
// 构建OkHttpClient时配置不重试,则返回false
if (!client.retryOnConnectionFailure) return false
// 返回false
// 1、如果是IO异常(非http2中断异常)表示请求可能发出
// 2、如果请求体只能被使用一次(默认为false)
if (requestSendStarted && requestIsOneShot(e, userRequest)) return false
// 返回false
// 协议异常、IO中断异常(除Socket读写超时之外),ssl认证异常
if (!isRecoverable(e, requestSendStarted)) return false
// 无更多的路线,返回false
if (!call.retryAfterFailure()) return false
// 以上情况都不是,则返回true 表示可以重试
return true
}
✅ 再来看看如何判断是否需要重定向的。
@Throws(IOException::class)
private fun followUpRequest(userResponse: Response, exchange: Exchange?): Request? {
val route = exchange?.connection?.route()
val responseCode = userResponse.code
val method = userResponse.request.method
when (responseCode) {
// 407 代理需要授权,通过proxyAuthenticator获得request,向其中添加请求头 Proxy-Authorization
// 然后构建一个新的request对象返回出去,准备再次请求
HTTP_PROXY_AUTH -> {
val selectedProxy = route!!.proxy
if (selectedProxy.type() != Proxy.Type.HTTP) {
throw ProtocolException(\”Received HTTP_PROXY_AUTH (407) code while not using proxy\”)
}
return client.proxyAuthenticator.authenticate(route, userResponse)
}
// 401 服务器请求需授权,通过authenticator获得到了Request,添加Authorization请求头
// 然后构建一个新的request对象返回出去,准备再次请求
HTTP_UNAUTHORIZED -> return client.authenticator.authenticate(route, userResponse)
// 返回的响应码是3xx,这就准备进行重定向,构建新的Request对象
HTTP_PERM_REDIRECT, HTTP_TEMP_REDIRECT, HTTP_MULT_CHOICE, HTTP_MOVED_PERM, HTTP_MOVED_TEMP, HTTP_SEE_OTHER -> {
return buildRedirectRequest(userResponse, method)
}
// 408 请求超时
HTTP_CLIENT_TIMEOUT -> {
// 用户设置是否可以进行重试(默认允许)
if (!client.retryOnConnectionFailure) {
return null
}
val requestBody = userResponse.request.body
if (requestBody != null && requestBody.isOneShot()) {
return null
}
val priorResponse = userResponse.priorResponse
// 如果上次也是因为408导致重试,这次请求又返回的408,则不会再去重试了,直接返回null
if (priorResponse != null && priorResponse.code == HTTP_CLIENT_TIMEOUT) {
return null
}
// 服务器返回的 Retry-After:0 或者未响应Retry-After就不会再次去请求
if (retryAfter(userResponse, 0) > 0) {
return null
}
// 返回当前的request对象,准备再次请求
return userResponse.request
}
// 503 服务不可用
HTTP_UNAVAILABLE -> {
val priorResponse = userResponse.priorResponse
// 和408相似,如果两次都是503导致请求重试,那么这次就不会再重试了,直接返回null
if (priorResponse != null && priorResponse.code == HTTP_UNAVAILABLE) {
return null
}
// 服务端返回的有Retry-After: 0,则立即重试
if (retryAfter(userResponse, Integer.MAX_VALUE) == 0) {
return userResponse.request
}
return null
}
// 421 当前客户端的IP地址连接到服务器的数量超过了服务器允许的范围
HTTP_MISDIRECTED_REQUEST -> {
val requestBody = userResponse.request.body
if (requestBody != null && requestBody.isOneShot()) {
return null
}
if (exchange == null || !exchange.isCoalescedConnection) {
return null
}
// 使用另一个连接对象发起请求
exchange.connection.noCoalescedConnections()
return userResponse.request
}
else -> return null
}
}
BridgeInterceptor拦截器
接下来,来到第二个拦截器 BridgeInterceptor。这个拦截器前面说过,主要就是用来补全请求头的,除此之外就是如果响应头中有Content-Encoding: gzip,则会用 GzipSource 进行解析。
// BridgeInterceptor.kt
class BridgeInterceptor(private val cookieJar: CookieJar) : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val userRequest = chain.request()
val requestBuilder = userRequest.newBuilder()
// 这里没有什么多说的,就是补全请求头
val body = userRequest.body
if (body != null) {
val contentType = body.contentType()
if (contentType != null) {
requestBuilder.header(\”Content-Type\”, contentType.toString())
}
val contentLength = body.contentLength()
if (contentLength != -1L) {
requestBuilder.header(\”Content-Length\”, contentLength.toString())
requestBuilder.removeHeader(\”Transfer-Encoding\”)
} else {
requestBuilder.header(\”Transfer-Encoding\”, \”chunked\”)
requestBuilder.removeHeader(\”Content-Length\”)
}
}
if (userRequest.header(\”Host\”) == null) {
requestBuilder.header(\”Host\”, userRequest.url.toHostHeader())
}
if (userRequest.header(\”Connection\”) == null) {
requestBuilder.header(\”Connection\”, \”Keep-Alive\”)
}
var transparentGzip = false
if (userRequest.header(\”Accept-Encoding\”) == null && userRequest.header(\”Range\”) == null) {
transparentGzip = true
requestBuilder.header(\”Accept-Encoding\”, \”gzip\”)
}
val cookies = cookieJar.loadForRequest(userRequest.url)
if (cookies.isNotEmpty()) {
requestBuilder.header(\”Cookie\”, cookieHeader(cookies))
}
if (userRequest.header(\”User-Agent\”) == null) {
requestBuilder.header(\”User-Agent\”, userAgent)
}
// 去调用下一个拦截器,并得到响应结果
val networkResponse = chain.proceed(requestBuilder.build())
cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)
val responseBuilder = networkResponse.newBuilder()
.request(userRequest)
// 根据响应头中的 Content-Encoding,判断是否需要gzip解析
if (transparentGzip &&
\”gzip\”.equals(networkResponse.header(\”Content-Encoding\”), ignoreCase = true) &&
networkResponse.promisesBody()) {
val responseBody = networkResponse.body
if (responseBody != null) {
val gzipSource = GzipSource(responseBody.source())
val strippedHeaders = networkResponse.headers.newBuilder()
.removeAll(\”Content-Encoding\”)
.removeAll(\”Content-Length\”)
.build()
responseBuilder.headers(strippedHeaders)
val contentType = networkResponse.header(\”Content-Type\”)
// 进行gzip解析
responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))
}
}
return responseBuilder.build()
}
桥接拦截器其实工作内容也很简单,在请求之前,向我们的请求头中添加必要的参数,然后拿到请求的响应后,根据响应头中的参数去判断是否需要 gzip 解析,如果需要则用 GzipSource 去解析就好了。
CacheInterceptor拦截器
在讲 CacheInterceptor 拦截器之前,我们先来了解一下 HTTP的缓存规则。 我们按照其行为将其分为两大类:强缓存和协商缓存。(这块可以去看小林Coding里面的,很详细)
1️⃣强缓存:浏览器并不会将请求发送给服务器。强缓存是利用 http 的返回头中的 Expires 或者 Cache-Control 两个字段来控制的,用来表示资源的缓存时间。
2️⃣协商缓存:浏览器会将请求发送至服务器。服务器根据 http 头信息中的 Last-Modify/If-Modify-Since 或 Etag/If-None-Match 来判断是否命中协商缓存。如果命中,则 http 返回码为 304 ,客户端从本地缓存中加载资源。
// CacheInterceptor.kt
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val call = chain.call()
val cacheCandidate = cache?.get(chain.request())
val now = System.currentTimeMillis()
val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
// 代表需要发起请求
val networkRequest = strategy.networkRequest
// 代表直接使用本地缓存
val cacheResponse = strategy.cacheResponse
…
// networkRequest 和 cacheResponse 都是null
// 说明服务器要求使用缓存,但是本地没有缓存,直接失败
if (networkRequest == null && cacheResponse == null) {
return Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(HTTP_GATEWAY_TIMEOUT)
.message(\”Unsatisfiable Request (only-if-cached)\”)
.body(EMPTY_RESPONSE)// 构建一个空的response返回过去
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
}
// networkRequest为null cacheResponse不为null
// 说明使用强缓存,成功
if (networkRequest == null) {
return cacheResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build().also {
listener.cacheHit(call, it)
}
}
…
var networkResponse: Response? = null
try {
// 走到这里说明需要请求一次,判断是协商缓存还是重新去服务器上获取资源
// 因此 调用下一个拦截器去继续请求
networkResponse = chain.proceed(networkRequest)
} finally {
if (networkResponse == null && cacheCandidate != null) {
cacheCandidate.body?.closeQuietly()
}
}
// 协商缓存
// 如果我们本地有缓存,并且服务器放回给我们304响应码,直接使用本地的缓存
if (cacheResponse != null) {
if (networkResponse?.code == HTTP_NOT_MODIFIED) {
val response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers, networkResponse.headers))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis)
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
networkResponse.body!!.close()
return response
…
} else {
cacheResponse.body?.closeQuietly()
}
}
// 走到这,说明响应码是200 表示我们需要用这次新请求的资源
val response = networkResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
if (cache != null) {
if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
// 将本次最新得到的响应存到cache中去
val cacheRequest = cache.put(response)
return cacheWritingResponse(cacheRequest, response).also {
}
}
…
}
// 将这次新请求的资源返回给上一层拦截器
return response
}
⚡ 总结一下缓存拦截器处理缓存的流程:首先得到 RealInterceptorChain 对象,然后通过它再得到两个很重要的对象:networkRequest 和 cacheResponse 。networkRequest 代表去发起一个网络请求, cacheResponse 代表使用本地缓存。通过这两个对象是否为 null来判断此次请求是使用直接缓存,还是去请求新的资源,还是去使用协商缓存。最后就是会更新缓存,把每次新请求的资源都重新保存至 cache 中。
ConnectInterceptor拦截器
连接拦截器主要就是做建立连接和连接复用的工作,它会从连接池中取出符合条件的连接,以免重复创建,从而提升请求效率。
object ConnectInterceptor : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
// 获取连接 Exchange:数据交换(封装了连接)
val exchange = realChain.call.initExchange(chain)
val connectedChain = realChain.copy(exchange = exchange)
// 继续调用下一个拦截器去请求数据
return connectedChain.proceed(realChain.request)
}
}
可以看到,这个连接拦截器中的代码比较少,主要的逻辑都在 initExchange() 方法当中,这个方法的作用就是拿到一个连接对象建立连接,这里我们可以看看initExchange的源码。
通过RealCall的initExchange(chain)创建一个Exchange对象,其中会打开与目标服务器的链接, 并调用 Chain.proceed()方法进入下一个拦截器。 initExchange()方法中会先通过 ExchangeFinder 尝试去 RealConnectionPool 中寻找已存在的连接,未找到则会重新创建一个RealConnection 并开始连接, 然后将其存入RealConnectionPool,现在已经准备好了RealConnection 对象,然后通过请求协议创建不同的ExchangeCodec 并返回,返回的ExchangeCodec正是创建Exchange对象的一个参数。 ConnectInterceptor的代码很简单,主要的功能就是初始化RealCall的Exchange。这个Exchange的功能就是基于RealConnection+ExchangeCodec进行数据交换。
/**
* 初始化并获取一个新的或池化的连接,用于执行即将到来的请求和响应。
*/
internal fun initExchange(chain: RealInterceptorChain): Exchange {
synchronized(this) {
// 确保仍然期待更多的交换,且资源未被释放
check(expectMoreExchanges) { \”已释放\” }
// 确保响应体未打开
check(!responseBodyOpen)
// 确保请求体未打开
check(!requestBodyOpen)
}
// 获取 ExchangeFinder 实例
val exchangeFinder = this.exchangeFinder!!
// 使用 ExchangeFinder 查找适合的编解码器
val codec = exchangeFinder.find(client, chain)
// 创建 Exchange 实例,包括当前对象、事件监听器、ExchangeFinder 和编解码器
val result = Exchange(this, eventListener, exchangeFinder, codec)
// 将创建的 Exchange 实例设置为拦截器范围的 Exchange 和主 Exchange
this.interceptorScopedExchange = result
this.exchange = result
synchronized(this) {
// 标记请求体和响应体为已打开状态
this.requestBodyOpen = true
this.responseBodyOpen = true
}
// 如果请求已取消,则抛出 IOException
if (canceled) throw IOException(\”请求已取消\”)
// 返回初始化后的 Exchange 实例
return result
}
CallServerInterceptor拦截器
// CallServerInterceptor.kt
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.exchange!!
val request = realChain.request
val requestBody = request.body
val sentRequestMillis = System.currentTimeMillis()
// 将请求头写入缓存中
exchange.writeRequestHeaders(request)
var invokeStartEvent = true
var responseBuilder: Response.Builder? = null
if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
// 大容量请求体会带有 Expect: 100-continue 字段,服务器识别同意后,才能继续发送请求给服务端
if (\”100-continue\”.equals(request.header(\”Expect\”), ignoreCase = true)) {
// 与服务器进行请求
exchange.flushRequest()
responseBuilder = exchange.readResponseHeaders(expectContinue = true)
exchange.responseHeadersStart()
invokeStartEvent = false
}
if (responseBuilder == null) {
if (requestBody.isDuplex()) {
// Prepare a duplex body so that the application can send a request body later.
exchange.flushRequest()
val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
requestBody.writeTo(bufferedRequestBody)
} else {
// 大部分情况都是走这里,通过IO流把响应结果写入response中
val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
requestBody.writeTo(bufferedRequestBody)
bufferedRequestBody.close()
}
} else {
exchange.noRequestBody()
if (!exchange.connection.isMultiplexed) {
// 没有响应 Expect:100 continue则阻止此连接得到复用,并且会关闭相关的socket
exchange.noNewExchangesOnConnection()
}
}
} else {
exchange.noRequestBody()
}
…
var response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
var code = response.code
if (code == 100) {
// 返回100 表示接收大请求体请求 继续发送请求体
// 得到response后返回结果
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
// 构建响应体
response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
code = response.code
}
response = if (forWebSocket && code == 101) {
// 如果状态码是101并且是webSocket就返回空的response
response.newBuilder()
.body(EMPTY_RESPONSE)
.build()
} else {
response.newBuilder()
.body(exchange.openResponseBody(response))
.build()
}
return response
}
在 okhttp 中,面对比较大的请求体时,会先去询问服务器是否接收此请求体,如果服务器接收并返回响应码 200,则 okhttp 继续发送请求体,否则就直接返回给客户端。如果服务器忽略此请求,则不会响应,最后客户端会超时抛出异常。
当然,如果你也可以自定义拦截器,在创建Client对象的时候,提供了对应的方法的。
拦截器这块的内容我目前感觉掌握的一般,这块的内容基本上参考掘金的一篇文章:传送门
好啦,以上就是OkHttp的核心代码剖析,欢迎大家和我一起在评论区探讨OkHttp的相关内容,笔者后续也会更新一些其他热门框架的源码剖析,喜欢的话可以留个关注,我还有很多技术文章or分享文章会不定时分享。
#以上关于OkHttp框架源码深度剖析【Android热门框架分析第一弹】的相关内容来源网络仅供参考,相关信息请以官方公告为准!
原创文章,作者:CSDN,如若转载,请注明出处:https://www.sudun.com/ask/91734.html