Java源码示例:okhttp3.internal.connection.RealConnection

示例1
@Override
public Response intercept(Chain chain) throws IOException {
    Request request = chain.request();
    Connection connection = chain.connection();
    Response response = null;
    try {
        response = chain.proceed(request);
    } catch (IOException e) {
        if (connection instanceof RealConnection) {
            RealConnection realConnection = (RealConnection) connection;
            realConnection.noNewExchanges();
        }
        throw e;
    }
    return response;
}
 
示例2
/** Returns the number of idle connections in the pool. */
public synchronized int idleConnectionCount() {
  int total = 0;
  for (RealConnection connection : connections) {
    if (connection.allocations.isEmpty()) total++;
  }
  return total;
}
 
示例3
/**
 * Returns a recycled connection to {@code address}, or null if no such connection exists. The
 * route is null if the address has not yet been routed.
 */
RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
  assert (Thread.holdsLock(this));
  for (RealConnection connection : connections) {
    if (connection.isEligible(address, route)) {
      streamAllocation.acquire(connection);
      return connection;
    }
  }
  return null;
}
 
示例4
/**
 * Replaces the connection held by {@code streamAllocation} with a shared connection if possible.
 * This recovers when multiple multiplexed connections are created concurrently.
 */
Socket deduplicate(Address address, StreamAllocation streamAllocation) {
  assert (Thread.holdsLock(this));
  for (RealConnection connection : connections) {
    if (connection.isEligible(address, null)
        && connection.isMultiplexed()
        && connection != streamAllocation.connection()) {
      return streamAllocation.releaseAndAcquire(connection);
    }
  }
  return null;
}
 
示例5
void put(RealConnection connection) {
  assert (Thread.holdsLock(this));
  if (!cleanupRunning) {
    cleanupRunning = true;
    executor.execute(cleanupRunnable);
  }
  connections.add(connection);
}
 
示例6
/**
 * Notify this pool that {@code connection} has become idle. Returns true if the connection has
 * been removed from the pool and should be closed.
 */
boolean connectionBecameIdle(RealConnection connection) {
  assert (Thread.holdsLock(this));
  if (connection.noNewStreams || maxIdleConnections == 0) {
    connections.remove(connection);
    return true;
  } else {
    notifyAll(); // Awake the cleanup thread: we may have exceeded the idle connection limit.
    return false;
  }
}
 
示例7
/**
 * Prunes any leaked allocations and then returns the number of remaining live allocations on
 * {@code connection}. Allocations are leaked if the connection is tracking them but the
 * application code has abandoned them. Leak detection is imprecise and relies on garbage
 * collection.
 */
private int pruneAndGetAllocationCount(RealConnection connection, long now) {
  List<Reference<StreamAllocation>> references = connection.allocations;
  for (int i = 0; i < references.size(); ) {
    Reference<StreamAllocation> reference = references.get(i);

    if (reference.get() != null) {
      i++;
      continue;
    }

    // We've discovered a leaked allocation. This is an application bug.
    StreamAllocation.StreamAllocationReference streamAllocRef =
        (StreamAllocation.StreamAllocationReference) reference;
    String message = "A connection to " + connection.route().address().url()
        + " was leaked. Did you forget to close a response body?";
    Platform.get().logCloseableLeak(message, streamAllocRef.callStackTrace);

    references.remove(i);
    connection.noNewStreams = true;

    // If this was the last allocation, the connection is eligible for immediate eviction.
    if (references.isEmpty()) {
      connection.idleAtNanos = now - keepAliveDurationNs;
      return 0;
    }
  }

  return references.size();
}
 
示例8
public RealInterceptorChain(List<Interceptor> interceptors, StreamAllocation streamAllocation,
    HttpCodec httpCodec, RealConnection connection, int index, Request request) {
  this.interceptors = interceptors;
  this.connection = connection;
  this.streamAllocation = streamAllocation;
  this.httpCodec = httpCodec;
  this.index = index;
  this.request = request;
}
 
示例9
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
    RealConnection connection) throws IOException {
  if (index >= interceptors.size()) throw new AssertionError();

  calls++;

  // If we already have a stream, confirm that the incoming request will use it.
  if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
    throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
        + " must retain the same host and port");
  }

  // If we already have a stream, confirm that this is the only call to chain.proceed().
  if (this.httpCodec != null && calls > 1) {
    throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
        + " must call proceed() exactly once");
  }

  // Call the next interceptor in the chain.
  RealInterceptorChain next = new RealInterceptorChain(
      interceptors, streamAllocation, httpCodec, connection, index + 1, request);
  Interceptor interceptor = interceptors.get(index);
  Response response = interceptor.intercept(next);

  // Confirm that the next interceptor made its required call to chain.proceed().
  if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
    throw new IllegalStateException("network interceptor " + interceptor
        + " must call proceed() exactly once");
  }

  // Confirm that the intercepted response isn't null.
  if (response == null) {
    throw new NullPointerException("interceptor " + interceptor + " returned null");
  }

  return response;
}
 
示例10
/**
 * Returns the number of idle connections in the pool.
 */
public synchronized int idleConnectionCount() {
    int total = 0;
    for (RealConnection connection : connections) {
        if (connection.allocations.isEmpty()) total++;
    }
    return total;
}
 
示例11
/**
 * Returns a recycled connection to {@code address}, or null if no such connection exists. The
 * route is null if the address has not yet been routed.
 */
RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
    assert (Thread.holdsLock(this));
    for (RealConnection connection : connections) {
        if (connection.isEligible(address, route)) {
            streamAllocation.acquire(connection);
            return connection;
        }
    }
    return null;
}
 
示例12
/**
 * Replaces the connection held by {@code streamAllocation} with a shared connection if possible.
 * This recovers when multiple multiplexed connections are created concurrently.
 */
Socket deduplicate(Address address, StreamAllocation streamAllocation) {
    assert (Thread.holdsLock(this));
    for (RealConnection connection : connections) {
        if (connection.isEligible(address, null)
                && connection.isMultiplexed()
                && connection != streamAllocation.connection()) {
            return streamAllocation.releaseAndAcquire(connection);
        }
    }
    return null;
}
 
示例13
void put(RealConnection connection) {
    assert (Thread.holdsLock(this));
    if (!cleanupRunning) {
        cleanupRunning = true;
        executor.execute(cleanupRunnable);
    }
    connections.add(connection);
}
 
示例14
/**
 * Notify this pool that {@code connection} has become idle. Returns true if the connection has
 * been removed from the pool and should be closed.
 */
boolean connectionBecameIdle(RealConnection connection) {
    assert (Thread.holdsLock(this));
    if (connection.noNewStreams || maxIdleConnections == 0) {
        connections.remove(connection);
        return true;
    } else {
        notifyAll(); // Awake the cleanup thread: we may have exceeded the idle connection limit.
        return false;
    }
}
 
示例15
/**
 * Prunes any leaked allocations and then returns the number of remaining live allocations on
 * {@code connection}. Allocations are leaked if the connection is tracking them but the
 * application code has abandoned them. Leak detection is imprecise and relies on garbage
 * collection.
 */
private int pruneAndGetAllocationCount(RealConnection connection, long now) {
    List<Reference<StreamAllocation>> references = connection.allocations;
    for (int i = 0; i < references.size(); ) {
        Reference<StreamAllocation> reference = references.get(i);

        if (reference.get() != null) {
            i++;
            continue;
        }

        // We've discovered a leaked allocation. This is an application bug.
        StreamAllocation.StreamAllocationReference streamAllocRef =
                (StreamAllocation.StreamAllocationReference) reference;
        String message = "A connection to " + connection.route().address().url()
                + " was leaked. Did you forget to close a response body?";
        Platform.get().logCloseableLeak(message, streamAllocRef.callStackTrace);

        references.remove(i);
        connection.noNewStreams = true;

        // If this was the last allocation, the connection is eligible for immediate eviction.
        if (references.isEmpty()) {
            connection.idleAtNanos = now - keepAliveDurationNs;
            return 0;
        }
    }

    return references.size();
}
 
示例16
public RealInterceptorChain(List<Interceptor> interceptors, StreamAllocation streamAllocation,
    HttpCodec httpCodec, RealConnection connection, int index, Request request) {
  this.interceptors = interceptors;
  this.connection = connection;
  this.streamAllocation = streamAllocation;
  this.httpCodec = httpCodec;
  this.index = index;
  this.request = request;
}
 
示例17
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
    RealConnection connection) throws IOException {
  if (index >= interceptors.size()) throw new AssertionError();

  calls++;

  // If we already have a stream, confirm that the incoming request will use it.
  // 如果我们已经有了一个流,确认传入的请求将会使用它。
  if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
    throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
        + " must retain the same host and port");
  }

  // If we already have a stream, confirm that this is the only call to chain.proceed().
  if (this.httpCodec != null && calls > 1) {
    throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
        + " must call proceed() exactly once");
  }

  // Call the next interceptor in the chain.
  RealInterceptorChain next = new RealInterceptorChain(
      interceptors, streamAllocation, httpCodec, connection, index + 1, request);
  Interceptor interceptor = interceptors.get(index);
  Response response = interceptor.intercept(next);

  // Confirm that the next interceptor made its required call to chain.proceed().
  if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
    throw new IllegalStateException("network interceptor " + interceptor
        + " must call proceed() exactly once");
  }

  // Confirm that the intercepted response isn't null.
  if (response == null) {
    throw new NullPointerException("interceptor " + interceptor + " returned null");
  }

  return response;
}
 
示例18
@Override public boolean connectionBecameIdle(
    ConnectionPool pool, RealConnection connection) {
  return pool.connectionBecameIdle(connection);
}
 
示例19
@Override public RealConnection get(ConnectionPool pool, Address address,
    StreamAllocation streamAllocation, Route route) {
  return pool.get(address, streamAllocation, route);
}
 
示例20
@Override public void put(ConnectionPool pool, RealConnection connection) {
  pool.put(connection);
}
 
示例21
/**
 * Performs maintenance on this pool, evicting the connection that has been idle the longest if
 * either it has exceeded the keep alive limit or the idle connections limit.
 *
 * <p>Returns the duration in nanos to sleep until the next scheduled call to this method. Returns
 * -1 if no further cleanups are required.
 */
long cleanup(long now) {
  int inUseConnectionCount = 0;
  int idleConnectionCount = 0;
  RealConnection longestIdleConnection = null;
  long longestIdleDurationNs = Long.MIN_VALUE;

  // Find either a connection to evict, or the time that the next eviction is due.
  synchronized (this) {
    for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
      RealConnection connection = i.next();

      // If the connection is in use, keep searching.
      if (pruneAndGetAllocationCount(connection, now) > 0) {
        inUseConnectionCount++;
        continue;
      }

      idleConnectionCount++;

      // If the connection is ready to be evicted, we're done.
      long idleDurationNs = now - connection.idleAtNanos;
      if (idleDurationNs > longestIdleDurationNs) {
        longestIdleDurationNs = idleDurationNs;
        longestIdleConnection = connection;
      }
    }

    if (longestIdleDurationNs >= this.keepAliveDurationNs
        || idleConnectionCount > this.maxIdleConnections) {
      // We've found a connection to evict. Remove it from the list, then close it below (outside
      // of the synchronized block).
      connections.remove(longestIdleConnection);
    } else if (idleConnectionCount > 0) {
      // A connection will be ready to evict soon.
      return keepAliveDurationNs - longestIdleDurationNs;
    } else if (inUseConnectionCount > 0) {
      // All connections are in use. It'll be at least the keep alive duration 'til we run again.
      return keepAliveDurationNs;
    } else {
      // No connections, idle or in use.
      cleanupRunning = false;
      return -1;
    }
  }

  closeQuietly(longestIdleConnection.socket());

  // Cleanup again immediately.
  return 0;
}
 
示例22
@Override public void cancel() {
  RealConnection connection = streamAllocation.connection();
  if (connection != null) connection.cancel();
}
 
示例23
@Override public Response intercept(Chain chain) throws IOException {
  RealInterceptorChain realChain = (RealInterceptorChain) chain;
  HttpCodec httpCodec = realChain.httpStream();
  StreamAllocation streamAllocation = realChain.streamAllocation();
  RealConnection connection = (RealConnection) realChain.connection();
  Request request = realChain.request();

  long sentRequestMillis = System.currentTimeMillis();
  httpCodec.writeRequestHeaders(request);

  Response.Builder responseBuilder = null;
  if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
    // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
    // Continue" response before transmitting the request body. If we don't get that, return what
    // we did get (such as a 4xx response) without ever transmitting the request body.
    if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
      httpCodec.flushRequest();
      responseBuilder = httpCodec.readResponseHeaders(true);
    }

    if (responseBuilder == null) {
      // Write the request body if the "Expect: 100-continue" expectation was met.
      Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
      BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
      request.body().writeTo(bufferedRequestBody);
      bufferedRequestBody.close();
    } else if (!connection.isMultiplexed()) {
      // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection from
      // being reused. Otherwise we're still obligated to transmit the request body to leave the
      // connection in a consistent state.
      streamAllocation.noNewStreams();
    }
  }

  httpCodec.finishRequest();

  if (responseBuilder == null) {
    responseBuilder = httpCodec.readResponseHeaders(false);
  }

  Response response = responseBuilder
      .request(request)
      .handshake(streamAllocation.connection().handshake())
      .sentRequestAtMillis(sentRequestMillis)
      .receivedResponseAtMillis(System.currentTimeMillis())
      .build();

  int code = response.code();
  if (forWebSocket && code == 101) {
    // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
    response = response.newBuilder()
        .body(Util.EMPTY_RESPONSE)
        .build();
  } else {
    response = response.newBuilder()
        .body(httpCodec.openResponseBody(response))
        .build();
  }

  if ("close".equalsIgnoreCase(response.request().header("Connection"))
      || "close".equalsIgnoreCase(response.header("Connection"))) {
    streamAllocation.noNewStreams();
  }

  if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
    throw new ProtocolException(
        "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
  }

  return response;
}
 
示例24
public abstract RealConnection get(ConnectionPool pool, Address address,
StreamAllocation streamAllocation, Route route);
 
示例25
@Override
public boolean connectionBecameIdle(
        ConnectionPool pool, RealConnection connection) {
    return pool.connectionBecameIdle(connection);
}
 
示例26
@Override
public RealConnection get(ConnectionPool pool, Address address,
                          StreamAllocation streamAllocation, Route route) {
    return pool.get(address, streamAllocation, route);
}
 
示例27
@Override
public void put(ConnectionPool pool, RealConnection connection) {
    pool.put(connection);
}
 
示例28
/**
 * Performs maintenance on this pool, evicting the connection that has been idle the longest if
 * either it has exceeded the keep alive limit or the idle connections limit.
 * <p>
 * <p>Returns the duration in nanos to sleep until the next scheduled call to this method. Returns
 * -1 if no further cleanups are required.
 */
long cleanup(long now) {
    int inUseConnectionCount = 0;
    int idleConnectionCount = 0;
    RealConnection longestIdleConnection = null;
    long longestIdleDurationNs = Long.MIN_VALUE;

    // Find either a connection to evict, or the time that the next eviction is due.
    synchronized (this) {
        for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
            RealConnection connection = i.next();

            // If the connection is in use, keep searching.
            if (pruneAndGetAllocationCount(connection, now) > 0) {
                inUseConnectionCount++;
                continue;
            }

            idleConnectionCount++;

            // If the connection is ready to be evicted, we're done.
            long idleDurationNs = now - connection.idleAtNanos;
            if (idleDurationNs > longestIdleDurationNs) {
                longestIdleDurationNs = idleDurationNs;
                longestIdleConnection = connection;
            }
        }

        if (longestIdleDurationNs >= this.keepAliveDurationNs
                || idleConnectionCount > this.maxIdleConnections) {
            // We've found a connection to evict. Remove it from the list, then close it below (outside
            // of the synchronized block).
            connections.remove(longestIdleConnection);
        } else if (idleConnectionCount > 0) {
            // A connection will be ready to evict soon.
            return keepAliveDurationNs - longestIdleDurationNs;
        } else if (inUseConnectionCount > 0) {
            // All connections are in use. It'll be at least the keep alive duration 'til we run again.
            return keepAliveDurationNs;
        } else {
            // No connections, idle or in use.
            cleanupRunning = false;
            return -1;
        }
    }

    closeQuietly(longestIdleConnection.socket());

    // Cleanup again immediately.
    return 0;
}
 
示例29
@Override public void cancel() {
  RealConnection connection = streamAllocation.connection();
  if (connection != null) connection.cancel();
}
 
示例30
public abstract RealConnection get(ConnectionPool pool, Address address,
StreamAllocation streamAllocation, Route route);