开源库之 OkHttp 源码分析

目录

技术答疑,成长进阶,可以加入我的知识星球:音视频领域专业问答的小圈子

分析一波 OkHttp 的源码实现。

简单使用

官方给出了使用例子,具体详情参考 官网

 1// 创建 OkHttp 请求客户端
 2OkHttpClient client = new OkHttpClient();
 3// 构建一个请求
 4Request request = new Request.Builder()
 5      .url(url)
 6      .build();
 7// 执行网络请求并返回结果      
 8Response response = client.newCall(request).execute();      
 9// 得到结果内容
10response.body().string();
JAVA

可以看到,使用过程还是很容易理解的:

首先创建请求客户端 -> 接着创建网络请求 -> 然后发出网络请求 -> 最后得到请求结果。

把网络请求过程抽象化之后就是上面的流水线步骤了,其中最重要的步骤就是发出网络请求了,在 OkHttp 中用到了责任链模式,对发出的网络请求会按照责任链表的顺序依次进行处理,比如请求重试、连接处理、缓存处理、日志处理等,这个地方再写一篇文章详细分析。

下面针对 OkHttp 的调用流程和结构作分析。

创建请求

首先是创建请求客户端 OkHttpClient 和网络请求 Request 。

对于这种要创建某某对象的,上来就是一个建造者模式,建造者模式可以说是在开源项目中最常见的设计模式了。

在 OkHttpClient 和 Request 内部都有一个 Builder 的内部类用来执行具体的构建操作,一般 Builder 类有很多方法,它们都是用来设置具体构建参数的,顺着哪些方法就可以找到都有哪些配置选项,这些选项也就是对外暴露的接口,这算是阅读源码的一个小技巧了。

OkHttpClient 的 Builder 内部配置选项:

 1      dispatcher = new Dispatcher();
 2      // 使用默认的选项
 3      protocols = DEFAULT_PROTOCOLS;
 4      connectionSpecs = DEFAULT_CONNECTION_SPECS;
 5      eventListenerFactory = EventListener.factory(EventListener.NONE);
 6      proxySelector = ProxySelector.getDefault();
 7      cookieJar = CookieJar.NO_COOKIES;
 8      socketFactory = SocketFactory.getDefault();
 9      hostnameVerifier = OkHostnameVerifier.INSTANCE;
10      certificatePinner = CertificatePinner.DEFAULT;
11      proxyAuthenticator = Authenticator.NONE;
12      authenticator = Authenticator.NONE;
13      connectionPool = new ConnectionPool();
14      dns = Dns.SYSTEM;
15      followSslRedirects = true;
16      followRedirects = true;
17      retryOnConnectionFailure = true;
18      connectTimeout = 10_000;
19      readTimeout = 10_000;
20      writeTimeout = 10_000;
21      pingInterval = 0;
JAVA

重点分析 Dispatcher 类用来管理和分发请求的,其他的如果没有指定的话都是默认选项,等用到时再具体分析。

在 Request 内部的 Builder 也有一些配置选项:

1    HttpUrl url;
2    String method;
3    Headers.Builder headers;
4    RequestBody body;
5    /** A mutable map of tags, or an immutable empty map if we don't have any. */
6    Map<Class<?>, Object> tags = Collections.emptyMap();
JAVA

urlbodymethod 这些都是 HTTP 请求相关的内容,用一个 Request 类将它们进行封装,到具体执行请求时,都会将它们取出来使用。

同步执行

1val response = client.newCall(request).execute()
KOTLIN

同步执行的代码如上,首先是 newCall 方法将 request 转换成 RealCall 对象,它实现了 Call 的接口。

Request 类只是封装了请求的信息,比如 GETPOST 方法之类的,但具体的执行通过 Call 这一层来实现了。

1  static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
2    // Safely publish the Call instance to the EventListener.
3    RealCall call = new RealCall(client, originalRequest, forWebSocket);
4    call.eventListener = client.eventListenerFactory().create(call);
5    return call;
6  }
JAVA

然后是 Call 的 execute 方法。

 1  @Override public Response execute() throws IOException {
 2    // 保证线程安全
 3    synchronized (this) {
 4      if (executed) throw new IllegalStateException("Already Executed");
 5      executed = true;
 6    }
 7    captureCallStackTrace();
 8    eventListener.callStart(this);
 9    try {
10      // 把请求添加到队列中
11      client.dispatcher().executed(this);
12      // 具体执行网络请求
13      Response result = getResponseWithInterceptorChain();
14      if (result == null) throw new IOException("Canceled");
15      return result;
16    } catch (IOException e) {
17      // 请求失败的回调
18      eventListener.callFailed(this, e);
19      throw e;
20    } finally {
21      // 将请求从队列中移除
22      client.dispatcher().finished(this);
23    }
24  }
JAVA

首先是 synchronized 加锁保证线程安全,然后是通过 Dispatcher 类的 executed 方法将请求 RealCall 添加到请求的同步队列中去。

1private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
2
3synchronized void executed(RealCall call) {
4  runningSyncCalls.add(call);
5}
JAVA

runningSyncCalls 是一个 Deque 类型的队列。Deque 是一个双端队列,它具有队列和栈的特性,队列中的元素可以从两端弹出,而插入和删除操作只能在队列的两端进行。

executed 方法只是将请求添加到了队列中,具体的执行在 getResponseWithInterceptorChain 方法中,在这里就会将请求通过OkHttp 的各种拦截器,按照责任链模式进行调用,并得到请求返回的结果,用 Response 类封装。

当请求结束后,会调用 Dispatcher 类的 finished 方法。

 1  void finished(RealCall call) {
 2    finished(runningSyncCalls, call, false);
 3  }
 4  
 5  private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
 6    int runningCallsCount;
 7    Runnable idleCallback;
 8    synchronized (this) {
 9      // 将请求从队列中移除
10      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
11      if (promoteCalls) promoteCalls();
12      runningCallsCount = runningCallsCount();
13      idleCallback = this.idleCallback;
14    }
15
16    if (runningCallsCount == 0 && idleCallback != null) {
17      idleCallback.run();
18    }
19  }  
JAVA

finished 方法主要是将请求 RealCall 从队列中 runningSyncCalls 移除。

这里还有个小知识点就是 trycatchfinally 三者的执行顺序,在 try 语句里面执行了 return 语句,但是返回的结果会保存在一个临时区域里面,然后执行 finally 语句,移除队列请求后再继续返回。

网络请求

下面重点看一下 getResponseWithInterceptorChain 方法。

 1  Response getResponseWithInterceptorChain() throws IOException {
 2    // Build a full stack of interceptors.
 3    List<Interceptor> interceptors = new ArrayList<>();
 4    interceptors.addAll(client.interceptors());
 5    // OkHttpClient 提供的默认拦截器
 6    interceptors.add(retryAndFollowUpInterceptor);
 7    interceptors.add(new BridgeInterceptor(client.cookieJar()));
 8    interceptors.add(new CacheInterceptor(client.internalCache()));
 9    interceptors.add(new ConnectInterceptor(client));
10    if (!forWebSocket) {
11      interceptors.addAll(client.networkInterceptors());
12    }
13    interceptors.add(new CallServerInterceptor(forWebSocket));
14    // 重点参数是 0 ,表示责任链的开始
15    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
16        originalRequest, this, eventListener, client.connectTimeoutMillis(),
17        client.readTimeoutMillis(), client.writeTimeoutMillis());
18
19    return chain.proceed(originalRequest);
20  }
JAVA

有一个拦截器 Interceptor 的集合,在创建 OkHttpClient 的时候可以通过 Builder 去添加我们自定义的拦截器,另外 OkHttp 也提供了几个默认的拦截器。

把拦截器添加到集合中,作为 RealInterceptorChain 类的参数,它实现了 Chain 接口,表示为一条。在这条上,每个拦截器都是它的一个节点,并且以链上任何一个拦截器为起点,又可以开始一条新的链。

具体来看看 proceed 方法:

 1  @Override public Response proceed(Request request) throws IOException {
 2    return proceed(request, streamAllocation, httpCodec, connection);
 3  }
 4  
 5  public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
 6      RealConnection connection) throws IOException {
 7    if (index >= interceptors.size()) throw new AssertionError();
 8
 9    calls++;
10    // 删除一些判断和抛出异常的方法
11    
12    // Call the next interceptor in the chain.
13    // 以拦截器队列中的下一个拦截器为起点,构建新的请求链
14    RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
15        connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
16        writeTimeout);
17    // 取出当前请求链中的第一个
18    Interceptor interceptor = interceptors.get(index);
19    // 执行当前拦截器的功能,并且开始子链的请求流程
20    Response response = interceptor.intercept(next);
21
22    // 删除一些判断和抛出异常的方法
23    return response;
24  }
JAVA

整个拦截器的集合 interceptors 是不会变的,而 index 就对应从拦截器集合中取出拦截器的索引,为 0 表示取出第一个来作为整条 的起点,从而构建一条请求链。

当一个拦截器执行具体功能时,也就是 intercept 方法,会把下一个作为参数传递过去,这样就又会以这条 为起始继续下一步的执行。

比如,看 HttpLoggingInterceptor 内部的实现:

1    Request request = chain.request();
2    if (level == Level.NONE) {
3      return chain.proceed(request);
4    }
JAVA

如果要打印的日志即为为 NONE,也就是不打印,直接就开始 的下一个执行了。

简单地看,这里就是一个递归的调用流程。

异步执行

1client.newCall(request).enqueue(object : okhttp3.Callback {
2
3    override fun onFailure(call: okhttp3.Call?, e: IOException?) {
4    }
5    
6    override fun onResponse(call: okhttp3.Call?, response: okhttp3.Response?) {
7    }
8})
KOTLIN

异步请求的代码如上,和同步请求的区别在于后面是 enqueue 方法。

 1  @Override public void enqueue(Callback responseCallback) {
 2    synchronized (this) {
 3      if (executed) throw new IllegalStateException("Already Executed");
 4      executed = true;
 5    }
 6    captureCallStackTrace();
 7    eventListener.callStart(this);
 8    // 通过 Dispatcher 管理请求
 9    // 
10    client.dispatcher().enqueue(new AsyncCall(responseCallback));
11  }
JAVA

enqueue 需要传入一个 Callback 的回调接口,用来处理请求成功和失败。另外,在 enqueue 的内部还是通过 Dispatcher 进行请求的管理。

这里传入的参数是 AsyncCall 类,不再是 RealCall 类了,AsyncCall 类继承自 NamedRunnable 类,NamedRunnable 实现了 Runnable 接口。

NamedRunnable 意思就是有名字的线程,会在线程执行时临时改变线程的名字,执行结束后再改回来。

 1  @Override public final void run() {
 2    // 保存当前线程的名字
 3    String oldName = Thread.currentThread().getName();
 4    // 设置新的名字
 5    Thread.currentThread().setName(name);
 6    try {
 7      execute();
 8    } finally {
 9      // 执行后再改为原来的名字
10      Thread.currentThread().setName(oldName);
11    }
12  }
JAVA

另外,AsyncCall 实现了 NamedRunnable 的抽象方法 execute ,该方法也是网络请求的具体执行部分,问题在于这些异步请求是如何分发和管理的,还是回到 Dispatcher 类中来。

1  synchronized void enqueue(AsyncCall call) {
2    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
3      runningAsyncCalls.add(call);
4      executorService().execute(call);
5    } else {
6      readyAsyncCalls.add(call);
7    }
8  }
JAVA

首先是判断异步请求队列 runningAsyncCalls 的个数是否超过最大请求数了,另外是判断同一主机的网络请求个数是否超过限制。

如果都不超过,就将请求添加到异步请求队列中,并在线程池中去执行。

如果超过了,就把请求添加到准备就绪的队列 readyAsyncCalls 中,等待后续再去执行。

executorService() 方法的执行就是初始化线程池的。

1  public synchronized ExecutorService executorService() {
2    if (executorService == null) {
3      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
4          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
5    }
6    return executorService;
7  }
JAVA

ThreadPoolExecutor 用来提供一个线程池,它的方法原型如下:

1    public ThreadPoolExecutor(int corePoolSize,             // 核心线程池的大小
2                              int maximumPoolSize,        // 线程池的最大大小
3                              long keepAliveTime,         // 线程池空闲时,线程的存活时间
4                              TimeUnit unit,             // 存活时间的时间单位
5                              BlockingQueue<Runnable> workQueue, // 存放线程任务的队列
6                              ThreadFactory threadFactory,      // 创建线程的工厂
7                              RejectedExecutionHandler handler) {     // 任务拒绝策略
8            // 省略
9    }
JAVA

使用线程池的好处是显而易见的,统一管理网络请求,减少线程频繁创建、销毁带来的开销。

通过线程池的 execute 方法去执行请求,具体就是 Runnable 接口中的 run 方法。

 1    // 异步请求中的方法
 2    @Override protected void execute() {
 3      boolean signalledCallback = false;
 4      try {
 5        // 具体执行的请求还是在这里
 6        Response response = getResponseWithInterceptorChain();
 7        if (retryAndFollowUpInterceptor.isCanceled()) {
 8          signalledCallback = true;
 9          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
10        } else {
11          signalledCallback = true;
12          responseCallback.onResponse(RealCall.this, response);
13        }
14      } catch (IOException e) {
15        if (signalledCallback) {
16          // Do not signal the callback twice!
17          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
18        } else {
19          // 请求失败的回调
20          eventListener.callFailed(RealCall.this, e);
21          responseCallback.onFailure(RealCall.this, e);
22        }
23      } finally {
24        client.dispatcher().finished(this);
25      }
26    }
JAVA

异步请求执行的线程变了,肯定就得有相应的回调通知,另外,具体是网络请求部分还是在 getResponseWithInterceptorChain 方法中。

finally 中还是会将异步的请求从队列中移除,不同的是内部最后一个参数为 false 。

1  void finished(AsyncCall call) {
2    // 与同步请求不同的是,最后一个参数为 True, 同步请求为 false
3    finished(runningAsyncCalls, call, true);
4  }
JAVA

因为 True,所以 promoteCalls 就会执行。

 1  // 异步请求才会执行该方法
 2  private void promoteCalls() {
 3    // 优先把 readyAsyncCalls 队列中的请求执行完
 4    if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
 5    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
 6
 7    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
 8      AsyncCall call = i.next();
 9
10      if (runningCallsForHost(call) < maxRequestsPerHost) {
11        i.remove();
12        runningAsyncCalls.add(call);
13        executorService().execute(call);
14      }
15
16      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
17    }
18  }
JAVA

之前提到,如果异步请求队列已经满了,就会把请求放到准备就绪的队列 readyAsyncCalls 中去,那么 promoteCalls 方法就是来处理 readyAsyncCalls 队列中的请求的。

每次从队列中移除异步请求后 size 都会减一,然后再判断队列的数量是否还是超过了最大请求数量,如果是就返回,表示优先把异步队列中的请求消耗完,如果不是,表示有空位,那就从准备就绪的队列中取出请求来执行。

readyAsyncCalls 队列中的请求放到 runningAsyncCalls 队列中去,再通过线程池去执行,并且还是会判断 runningAsyncCalls 的数量是否超过了最大请求,超过了就返回,表示 OkHttp 中整个异步请求的数量不能超过 maxRequests 表示的数量。

就这样实现了 OkHttp 的异步请求管理流程,并且还有一个类似请求排队的机制,

小结

OkHttp 是用来执行网络请求,包括同步和异步的请求。

对于同步请求,直接就执行了,同步阻塞直到返回取得请求结果。

对于异步请求,对请求进行管理,限制请求最大数量,如果超出数量就排队候选,每次执行完一次异步请求,有空位就去处理排队的请求。

对于具体的请求过程,通过责任链的模式,把请求分成多个过程,递归地对请求进行处理。

欢迎关注微信公众号:音视频开发进阶

Licensed under CC BY-NC-SA 4.0
粤ICP备20067247号
使用 Hugo 构建    主题 StackedJimmy 设计,Jacob 修改