/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.AddressChangeHostResolver;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.DefaultHostResolver;
import org.apache.kafka.clients.HostResolver;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.LeastLoadedNode;
import org.apache.kafka.clients.ManualMetadataUpdater;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.clients.MetadataUpdater;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.NetworkClientUtils;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
import org.apache.kafka.common.message.ProduceRequestData;
import org.apache.kafka.common.message.ProduceResponseData;
import org.apache.kafka.common.message.PushTelemetryRequestData;
import org.apache.kafka.common.message.PushTelemetryResponseData;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.requests.PushTelemetryRequest;
import org.apache.kafka.common.requests.PushTelemetryResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.DelayedReceive;
import org.apache.kafka.test.MockSelector;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class NetworkClientTest {
    protected final int defaultRequestTimeoutMs = 1000;
    protected final MockTime time = new MockTime();
    protected final MockSelector selector = new MockSelector(this.time);
    protected final Node node = (Node)TestUtils.singletonCluster().nodes().iterator().next();
    protected final long reconnectBackoffMsTest = 10000L;
    protected final long reconnectBackoffMaxMsTest = 100000L;
    protected final long connectionSetupTimeoutMsTest = 5000L;
    protected final long connectionSetupTimeoutMaxMsTest = 127000L;
    private final int reconnectBackoffExpBase = 2;
    private final double reconnectBackoffJitter = 0.2;
    private final TestMetadataUpdater metadataUpdater = new TestMetadataUpdater(Collections.singletonList(this.node));
    private final NetworkClient client = this.createNetworkClient(100000L);
    private final NetworkClient clientWithNoExponentialBackoff = this.createNetworkClient(10000L);
    private final NetworkClient clientWithStaticNodes = this.createNetworkClientWithStaticNodes();
    private final NetworkClient clientWithNoVersionDiscovery = this.createNetworkClientWithNoVersionDiscovery();
    private static ArrayList<InetAddress> initialAddresses;
    private static ArrayList<InetAddress> newAddresses;

    private NetworkClient createNetworkClient(long reconnectBackoffMaxMs) {
        return new NetworkClient((Selectable)this.selector, (MetadataUpdater)this.metadataUpdater, "mock", Integer.MAX_VALUE, 10000L, reconnectBackoffMaxMs, 65536, 65536, 1000, 5000L, 127000L, (Time)this.time, true, new ApiVersions(), new LogContext(), MetadataRecoveryStrategy.NONE);
    }

    private NetworkClient createNetworkClientWithMaxInFlightRequestsPerConnection(int maxInFlightRequestsPerConnection, long reconnectBackoffMaxMs) {
        return new NetworkClient((Selectable)this.selector, (MetadataUpdater)this.metadataUpdater, "mock", maxInFlightRequestsPerConnection, 10000L, reconnectBackoffMaxMs, 65536, 65536, 1000, 5000L, 127000L, (Time)this.time, true, new ApiVersions(), new LogContext(), MetadataRecoveryStrategy.NONE);
    }

    private NetworkClient createNetworkClientWithMultipleNodes(long reconnectBackoffMaxMs, long connectionSetupTimeoutMsTest, int nodeNumber) {
        List nodes = TestUtils.clusterWith(nodeNumber).nodes();
        TestMetadataUpdater metadataUpdater = new TestMetadataUpdater(nodes);
        return new NetworkClient((Selectable)this.selector, (MetadataUpdater)metadataUpdater, "mock", Integer.MAX_VALUE, 10000L, reconnectBackoffMaxMs, 65536, 65536, 1000, connectionSetupTimeoutMsTest, 127000L, (Time)this.time, true, new ApiVersions(), new LogContext(), MetadataRecoveryStrategy.NONE);
    }

    private NetworkClient createNetworkClientWithStaticNodes() {
        return new NetworkClient((Selectable)this.selector, (MetadataUpdater)this.metadataUpdater, "mock-static", Integer.MAX_VALUE, 0L, 0L, 65536, 65536, 1000, 5000L, 127000L, (Time)this.time, true, new ApiVersions(), new LogContext(), MetadataRecoveryStrategy.NONE);
    }

    private NetworkClient createNetworkClientWithNoVersionDiscovery(Metadata metadata) {
        return new NetworkClient((Selectable)this.selector, metadata, "mock", Integer.MAX_VALUE, 10000L, 0L, 65536, 65536, 1000, 5000L, 127000L, (Time)this.time, false, new ApiVersions(), new LogContext(), MetadataRecoveryStrategy.NONE);
    }

    private NetworkClient createNetworkClientWithNoVersionDiscovery() {
        return new NetworkClient((Selectable)this.selector, (MetadataUpdater)this.metadataUpdater, "mock", Integer.MAX_VALUE, 10000L, 100000L, 65536, 65536, 1000, 5000L, 127000L, (Time)this.time, false, new ApiVersions(), new LogContext(), MetadataRecoveryStrategy.NONE);
    }

    @BeforeEach
    public void setup() {
        this.selector.reset();
    }

    @Test
    public void testSendToUnreadyNode() {
        MetadataRequest.Builder builder = new MetadataRequest.Builder(Collections.singletonList("test"), true);
        long now = this.time.milliseconds();
        ClientRequest request = this.client.newClientRequest("5", (AbstractRequest.Builder)builder, now, false);
        Assertions.assertThrows(IllegalStateException.class, () -> this.client.send(request, now));
    }

    @Test
    public void testSimpleRequestResponse() {
        this.checkSimpleRequestResponse(this.client);
    }

    @Test
    public void testSimpleRequestResponseWithStaticNodes() {
        this.checkSimpleRequestResponse(this.clientWithStaticNodes);
    }

    @Test
    public void testSimpleRequestResponseWithNoBrokerDiscovery() {
        this.checkSimpleRequestResponse(this.clientWithNoVersionDiscovery);
    }

    @Test
    public void testDnsLookupFailure() {
        Assertions.assertFalse((boolean)this.client.ready(new Node(1234, "badhost", 1234), this.time.milliseconds()));
    }

    @Test
    public void testClose() {
        this.client.ready(this.node, this.time.milliseconds());
        this.awaitReady(this.client, this.node);
        this.client.poll(1L, this.time.milliseconds());
        Assertions.assertTrue((boolean)this.client.isReady(this.node, this.time.milliseconds()), (String)"The client should be ready");
        ProduceRequest.Builder builder = ProduceRequest.forCurrentMagic((ProduceRequestData)new ProduceRequestData().setTopicData(new ProduceRequestData.TopicProduceDataCollection()).setAcks((short)1).setTimeoutMs(1000));
        ClientRequest request = this.client.newClientRequest(this.node.idString(), (AbstractRequest.Builder)builder, this.time.milliseconds(), true);
        this.client.send(request, this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount(this.node.idString()), (String)"There should be 1 in-flight request after send");
        Assertions.assertTrue((boolean)this.client.hasInFlightRequests(this.node.idString()));
        Assertions.assertTrue((boolean)this.client.hasInFlightRequests());
        this.client.close(this.node.idString());
        Assertions.assertEquals((int)0, (int)this.client.inFlightRequestCount(this.node.idString()), (String)"There should be no in-flight request after close");
        Assertions.assertFalse((boolean)this.client.hasInFlightRequests(this.node.idString()));
        Assertions.assertFalse((boolean)this.client.hasInFlightRequests());
        Assertions.assertFalse((boolean)this.client.isReady(this.node, 0L), (String)"Connection should not be ready after close");
    }

    @Test
    public void testUnsupportedVersionDuringInternalMetadataRequest() {
        List<String> topics = Collections.singletonList("topic_1");
        MetadataRequest.Builder builder = new MetadataRequest.Builder(topics, false, 3);
        this.client.sendInternalMetadataRequest(builder, this.node.idString(), this.time.milliseconds());
        Assertions.assertEquals(UnsupportedVersionException.class, this.metadataUpdater.getAndClearFailure().getClass());
    }

    private void checkSimpleRequestResponse(NetworkClient networkClient) {
        this.awaitReady(networkClient, this.node);
        short requestVersion = ApiKeys.PRODUCE.latestVersion();
        ProduceRequest.Builder builder = new ProduceRequest.Builder(requestVersion, requestVersion, new ProduceRequestData().setAcks((short)1).setTimeoutMs(1000));
        TestCallbackHandler handler = new TestCallbackHandler();
        ClientRequest request = networkClient.newClientRequest(this.node.idString(), (AbstractRequest.Builder)builder, this.time.milliseconds(), true, 1000, (RequestCompletionHandler)handler);
        networkClient.send(request, this.time.milliseconds());
        networkClient.poll(1L, this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)networkClient.inFlightRequestCount());
        ProduceResponse produceResponse = new ProduceResponse(new ProduceResponseData());
        ByteBuffer buffer = RequestTestUtils.serializeResponseWithHeader((AbstractResponse)produceResponse, requestVersion, request.correlationId());
        this.selector.completeReceive(new NetworkReceive(this.node.idString(), buffer));
        List responses = networkClient.poll(1L, this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)responses.size());
        Assertions.assertTrue((boolean)handler.executed, (String)"The handler should have executed.");
        Assertions.assertTrue((boolean)handler.response.hasResponse(), (String)"Should have a response body.");
        Assertions.assertEquals((int)request.correlationId(), (int)handler.response.requestHeader().correlationId(), (String)"Should be correlated to the original request");
    }

    private void delayedApiVersionsResponse(int correlationId, short version, ApiVersionsResponse response) {
        ByteBuffer buffer = RequestTestUtils.serializeResponseWithHeader((AbstractResponse)response, version, correlationId);
        this.selector.delayedReceive(new DelayedReceive(this.node.idString(), new NetworkReceive(this.node.idString(), buffer)));
    }

    private void setExpectedApiVersionsResponse(ApiVersionsResponse response) {
        short apiVersionsResponseVersion = response.apiVersion(ApiKeys.API_VERSIONS.id).maxVersion();
        this.delayedApiVersionsResponse(0, apiVersionsResponseVersion, response);
    }

    private void awaitReady(NetworkClient client, Node node) {
        if (client.discoverBrokerVersions()) {
            this.setExpectedApiVersionsResponse(TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER));
        }
        while (!client.ready(node, this.time.milliseconds())) {
            client.poll(1L, this.time.milliseconds());
        }
        this.selector.clear();
    }

    @Test
    public void testInvalidApiVersionsRequest() {
        this.client.ready(this.node, this.time.milliseconds());
        this.client.poll(0L, this.time.milliseconds());
        Assertions.assertTrue((boolean)this.client.hasInFlightRequests(this.node.idString()));
        this.delayedApiVersionsResponse(0, ApiKeys.API_VERSIONS.latestVersion(), new ApiVersionsResponse(new ApiVersionsResponseData().setErrorCode(Errors.INVALID_REQUEST.code()).setThrottleTimeMs(0)));
        this.client.poll(0L, this.time.milliseconds());
        Assertions.assertFalse((boolean)this.client.hasInFlightRequests(this.node.idString()));
        Assertions.assertFalse((boolean)this.client.isReady(this.node, this.time.milliseconds()));
    }

    @Test
    public void testApiVersionsRequest() {
        this.client.ready(this.node, this.time.milliseconds());
        this.client.poll(0L, this.time.milliseconds());
        Assertions.assertTrue((boolean)this.client.hasInFlightRequests(this.node.idString()));
        this.delayedApiVersionsResponse(0, ApiKeys.API_VERSIONS.latestVersion(), this.defaultApiVersionsResponse());
        this.client.poll(0L, this.time.milliseconds());
        Assertions.assertFalse((boolean)this.client.hasInFlightRequests(this.node.idString()));
        Assertions.assertTrue((boolean)this.client.isReady(this.node, this.time.milliseconds()));
    }

    @Test
    public void testUnsupportedApiVersionsRequestWithVersionProvidedByTheBroker() {
        this.client.ready(this.node, this.time.milliseconds());
        this.client.poll(0L, this.time.milliseconds());
        Assertions.assertTrue((boolean)this.client.hasInFlightRequests(this.node.idString()));
        this.client.poll(0L, this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)this.selector.completedSends().size());
        ByteBuffer buffer = this.selector.completedSendBuffers().get(0).buffer();
        RequestHeader header = this.parseHeader(buffer);
        Assertions.assertEquals((Object)ApiKeys.API_VERSIONS, (Object)header.apiKey());
        Assertions.assertEquals((int)4, (int)header.apiVersion());
        ApiVersionsResponseData.ApiVersionCollection apiKeys = new ApiVersionsResponseData.ApiVersionCollection();
        apiKeys.add((ImplicitLinkedHashCollection.Element)new ApiVersionsResponseData.ApiVersion().setApiKey(ApiKeys.API_VERSIONS.id).setMinVersion((short)0).setMaxVersion((short)2));
        this.delayedApiVersionsResponse(0, (short)0, new ApiVersionsResponse(new ApiVersionsResponseData().setErrorCode(Errors.UNSUPPORTED_VERSION.code()).setApiKeys(apiKeys)));
        this.client.poll(0L, this.time.milliseconds());
        Assertions.assertTrue((boolean)this.client.hasInFlightRequests(this.node.idString()));
        Assertions.assertEquals((int)1, (int)this.selector.completedReceives().size());
        this.selector.completedSends().clear();
        this.selector.completedSendBuffers().clear();
        this.selector.completedReceives().clear();
        this.client.poll(0L, this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)this.selector.completedSends().size());
        buffer = this.selector.completedSendBuffers().get(0).buffer();
        header = this.parseHeader(buffer);
        Assertions.assertEquals((Object)ApiKeys.API_VERSIONS, (Object)header.apiKey());
        Assertions.assertEquals((int)2, (int)header.apiVersion());
        this.delayedApiVersionsResponse(1, (short)0, this.defaultApiVersionsResponse());
        this.client.poll(0L, this.time.milliseconds());
        Assertions.assertFalse((boolean)this.client.hasInFlightRequests(this.node.idString()));
        Assertions.assertEquals((int)1, (int)this.selector.completedReceives().size());
        Assertions.assertTrue((boolean)this.client.isReady(this.node, this.time.milliseconds()));
    }

    @Test
    public void testUnsupportedApiVersionsRequestWithoutVersionProvidedByTheBroker() {
        this.client.ready(this.node, this.time.milliseconds());
        this.client.poll(0L, this.time.milliseconds());
        Assertions.assertTrue((boolean)this.client.hasInFlightRequests(this.node.idString()));
        this.client.poll(0L, this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)this.selector.completedSends().size());
        ByteBuffer buffer = this.selector.completedSendBuffers().get(0).buffer();
        RequestHeader header = this.parseHeader(buffer);
        Assertions.assertEquals((Object)ApiKeys.API_VERSIONS, (Object)header.apiKey());
        Assertions.assertEquals((int)4, (int)header.apiVersion());
        this.delayedApiVersionsResponse(0, (short)0, new ApiVersionsResponse(new ApiVersionsResponseData().setErrorCode(Errors.UNSUPPORTED_VERSION.code())));
        this.client.poll(0L, this.time.milliseconds());
        Assertions.assertTrue((boolean)this.client.hasInFlightRequests(this.node.idString()));
        Assertions.assertEquals((int)1, (int)this.selector.completedReceives().size());
        this.selector.completedSends().clear();
        this.selector.completedSendBuffers().clear();
        this.selector.completedReceives().clear();
        this.client.poll(0L, this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)this.selector.completedSends().size());
        buffer = this.selector.completedSendBuffers().get(0).buffer();
        header = this.parseHeader(buffer);
        Assertions.assertEquals((Object)ApiKeys.API_VERSIONS, (Object)header.apiKey());
        Assertions.assertEquals((int)0, (int)header.apiVersion());
        this.delayedApiVersionsResponse(1, (short)0, this.defaultApiVersionsResponse());
        this.client.poll(0L, this.time.milliseconds());
        Assertions.assertFalse((boolean)this.client.hasInFlightRequests(this.node.idString()));
        Assertions.assertEquals((int)1, (int)this.selector.completedReceives().size());
        Assertions.assertTrue((boolean)this.client.isReady(this.node, this.time.milliseconds()));
    }

    @Test
    public void testRequestTimeout() {
        this.testRequestTimeout(6000);
    }

    @Test
    public void testDefaultRequestTimeout() {
        this.testRequestTimeout(1000);
    }

    private void testRequestTimeout(int requestTimeoutMs) {
        Metadata metadata = new Metadata(50L, 50L, 5000L, new LogContext(), new ClusterResourceListeners());
        MetadataResponse metadataResponse = RequestTestUtils.metadataUpdateWith(2, Collections.emptyMap());
        metadata.updateWithCurrentRequestVersion(metadataResponse, false, this.time.milliseconds());
        NetworkClient client = this.createNetworkClientWithNoVersionDiscovery(metadata);
        ClientResponse clientResponse = this.produce(client, requestTimeoutMs, false);
        Assertions.assertEquals((Object)this.node.idString(), (Object)clientResponse.destination());
        Assertions.assertFalse((boolean)clientResponse.wasDisconnected(), (String)"Expected response to succeed and not disconnect");
        Assertions.assertFalse((boolean)clientResponse.wasTimedOut(), (String)"Expected response to succeed and not time out");
        Assertions.assertFalse((boolean)metadata.updateRequested(), (String)"Expected NetworkClient to not need to update metadata");
        clientResponse = this.produce(client, requestTimeoutMs, true);
        Assertions.assertEquals((Object)this.node.idString(), (Object)clientResponse.destination());
        Assertions.assertTrue((boolean)clientResponse.wasDisconnected(), (String)"Expected response to fail due to disconnection");
        Assertions.assertTrue((boolean)clientResponse.wasTimedOut(), (String)"Expected response to fail due to timeout");
        Assertions.assertTrue((boolean)metadata.updateRequested(), (String)"Expected NetworkClient to have called requestUpdate on metadata on timeout");
    }

    private ClientResponse produce(NetworkClient client, int requestTimeoutMs, boolean shouldEmulateTimeout) {
        this.awaitReady(client, this.node);
        ProduceRequest.Builder builder = ProduceRequest.forCurrentMagic((ProduceRequestData)new ProduceRequestData().setTopicData(new ProduceRequestData.TopicProduceDataCollection()).setAcks((short)1).setTimeoutMs(1000));
        TestCallbackHandler handler = new TestCallbackHandler();
        ClientRequest request = client.newClientRequest(this.node.idString(), (AbstractRequest.Builder)builder, this.time.milliseconds(), true, requestTimeoutMs, (RequestCompletionHandler)handler);
        client.send(request, this.time.milliseconds());
        if (shouldEmulateTimeout) {
            this.time.sleep(requestTimeoutMs + 1);
        } else {
            ProduceResponse produceResponse = new ProduceResponse(new ProduceResponseData());
            ByteBuffer buffer = RequestTestUtils.serializeResponseWithHeader((AbstractResponse)produceResponse, ApiKeys.PRODUCE.latestVersion(), request.correlationId());
            this.selector.completeReceive(new NetworkReceive(this.node.idString(), buffer));
        }
        List responses = client.poll(0L, this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)responses.size());
        return (ClientResponse)responses.get(0);
    }

    @Test
    public void testConnectionSetupTimeout() {
        Cluster cluster = TestUtils.clusterWith(2);
        Node node0 = cluster.nodeById(0);
        Node node1 = cluster.nodeById(1);
        this.client.ready(node0, this.time.milliseconds());
        this.selector.serverConnectionBlocked(node0.idString());
        this.client.ready(node1, this.time.milliseconds());
        this.selector.serverConnectionBlocked(node1.idString());
        this.client.poll(0L, this.time.milliseconds());
        Assertions.assertFalse((boolean)this.client.connectionFailed(this.node), (String)"The connections should not fail before the socket connection setup timeout elapsed");
        this.time.sleep(6001L);
        this.client.poll(0L, this.time.milliseconds());
        Assertions.assertTrue((boolean)this.client.connectionFailed(this.node), (String)"Expected the connections to fail due to the socket connection setup timeout");
    }

    @Test
    public void testConnectionTimeoutAfterThrottling() {
        this.awaitReady(this.client, this.node);
        short requestVersion = ApiKeys.PRODUCE.latestVersion();
        int timeoutMs = 1000;
        ProduceRequest.Builder builder = new ProduceRequest.Builder(requestVersion, requestVersion, new ProduceRequestData().setAcks((short)1).setTimeoutMs(timeoutMs));
        TestCallbackHandler handler = new TestCallbackHandler();
        ClientRequest r1 = this.client.newClientRequest(this.node.idString(), (AbstractRequest.Builder)builder, this.time.milliseconds(), true, 1000, (RequestCompletionHandler)handler);
        this.client.send(r1, this.time.milliseconds());
        this.client.poll(0L, this.time.milliseconds());
        ProduceResponse pr = new ProduceResponse(new ProduceResponseData().setThrottleTimeMs(timeoutMs));
        ByteBuffer buffer = RequestTestUtils.serializeResponseWithHeader((AbstractResponse)pr, requestVersion, r1.correlationId());
        this.selector.delayedReceive(new DelayedReceive(this.node.idString(), new NetworkReceive(this.node.idString(), buffer)));
        ClientRequest r2 = this.client.newClientRequest(this.node.idString(), (AbstractRequest.Builder)builder, this.time.milliseconds(), true, 1000, (RequestCompletionHandler)handler);
        this.client.send(r2, this.time.milliseconds());
        this.time.sleep(timeoutMs);
        this.client.poll(0L, this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount(this.node.idString()));
        Assertions.assertFalse((boolean)this.client.connectionFailed(this.node), (String)"Connection should not have failed due to the extra time spent throttling.");
    }

    @Test
    public void testConnectionThrottling() {
        this.awaitReady(this.client, this.node);
        short requestVersion = ApiKeys.PRODUCE.latestVersion();
        ProduceRequest.Builder builder = new ProduceRequest.Builder(requestVersion, requestVersion, new ProduceRequestData().setAcks((short)1).setTimeoutMs(1000));
        TestCallbackHandler handler = new TestCallbackHandler();
        ClientRequest request = this.client.newClientRequest(this.node.idString(), (AbstractRequest.Builder)builder, this.time.milliseconds(), true, 1000, (RequestCompletionHandler)handler);
        this.client.send(request, this.time.milliseconds());
        this.client.poll(1L, this.time.milliseconds());
        int throttleTime = 100;
        ProduceResponse produceResponse = new ProduceResponse(new ProduceResponseData().setThrottleTimeMs(throttleTime));
        ByteBuffer buffer = RequestTestUtils.serializeResponseWithHeader((AbstractResponse)produceResponse, requestVersion, request.correlationId());
        this.selector.completeReceive(new NetworkReceive(this.node.idString(), buffer));
        this.client.poll(1L, this.time.milliseconds());
        Assertions.assertFalse((boolean)this.client.ready(this.node, this.time.milliseconds()));
        Assertions.assertEquals((long)100L, (long)this.client.throttleDelayMs(this.node, this.time.milliseconds()));
        this.time.sleep(50L);
        Assertions.assertFalse((boolean)this.client.ready(this.node, this.time.milliseconds()));
        Assertions.assertEquals((long)50L, (long)this.client.throttleDelayMs(this.node, this.time.milliseconds()));
        this.time.sleep(50L);
        Assertions.assertTrue((boolean)this.client.ready(this.node, this.time.milliseconds()));
        Assertions.assertEquals((long)0L, (long)this.client.throttleDelayMs(this.node, this.time.milliseconds()));
    }

    private ApiVersionsResponse createExpectedApiVersionsResponse(ApiKeys key, short maxVersion) {
        ApiVersionsResponseData.ApiVersionCollection versionList = new ApiVersionsResponseData.ApiVersionCollection();
        for (ApiKeys apiKey : ApiKeys.values()) {
            if (apiKey == key) {
                versionList.add((ImplicitLinkedHashCollection.Element)new ApiVersionsResponseData.ApiVersion().setApiKey(apiKey.id).setMinVersion((short)0).setMaxVersion(maxVersion));
                continue;
            }
            versionList.add((ImplicitLinkedHashCollection.Element)ApiVersionsResponse.toApiVersion((ApiKeys)apiKey));
        }
        return new ApiVersionsResponse(new ApiVersionsResponseData().setErrorCode(Errors.NONE.code()).setThrottleTimeMs(0).setApiKeys(versionList));
    }

    @Test
    public void testThrottlingNotEnabledForConnectionToOlderBroker() {
        this.setExpectedApiVersionsResponse(this.createExpectedApiVersionsResponse(ApiKeys.PRODUCE, (short)5));
        while (!this.client.ready(this.node, this.time.milliseconds())) {
            this.client.poll(1L, this.time.milliseconds());
        }
        this.selector.clear();
        int correlationId = this.sendEmptyProduceRequest();
        this.client.poll(1L, this.time.milliseconds());
        this.sendThrottledProduceResponse(correlationId, 100, (short)5);
        this.client.poll(1L, this.time.milliseconds());
        Assertions.assertTrue((boolean)this.client.ready(this.node, this.time.milliseconds()));
        Assertions.assertEquals((long)0L, (long)this.client.throttleDelayMs(this.node, this.time.milliseconds()));
    }

    private int sendEmptyProduceRequest() {
        return this.sendEmptyProduceRequest(this.node.idString());
    }

    private int sendEmptyProduceRequest(String nodeId) {
        ProduceRequest.Builder builder = ProduceRequest.forCurrentMagic((ProduceRequestData)new ProduceRequestData().setTopicData(new ProduceRequestData.TopicProduceDataCollection()).setAcks((short)1).setTimeoutMs(1000));
        TestCallbackHandler handler = new TestCallbackHandler();
        ClientRequest request = this.client.newClientRequest(nodeId, (AbstractRequest.Builder)builder, this.time.milliseconds(), true, 1000, (RequestCompletionHandler)handler);
        this.client.send(request, this.time.milliseconds());
        return request.correlationId();
    }

    private void sendResponse(AbstractResponse response, short version, int correlationId) {
        ByteBuffer buffer = RequestTestUtils.serializeResponseWithHeader(response, version, correlationId);
        this.selector.completeReceive(new NetworkReceive(this.node.idString(), buffer));
    }

    private void sendThrottledProduceResponse(int correlationId, int throttleMs, short version) {
        ProduceResponse response = new ProduceResponse(new ProduceResponseData().setThrottleTimeMs(throttleMs));
        this.sendResponse((AbstractResponse)response, version, correlationId);
    }

    @Test
    public void testLeastLoadedNode() {
        this.client.ready(this.node, this.time.milliseconds());
        Assertions.assertFalse((boolean)this.client.isReady(this.node, this.time.milliseconds()));
        LeastLoadedNode leastLoadedNode = this.client.leastLoadedNode(this.time.milliseconds());
        Assertions.assertEquals((Object)this.node, (Object)leastLoadedNode.node());
        Assertions.assertTrue((boolean)leastLoadedNode.hasNodeAvailableOrConnectionReady());
        this.awaitReady(this.client, this.node);
        this.client.poll(1L, this.time.milliseconds());
        Assertions.assertTrue((boolean)this.client.isReady(this.node, this.time.milliseconds()), (String)"The client should be ready");
        leastLoadedNode = this.client.leastLoadedNode(this.time.milliseconds());
        Assertions.assertTrue((boolean)leastLoadedNode.hasNodeAvailableOrConnectionReady());
        Node leastNode = leastLoadedNode.node();
        Assertions.assertEquals((int)leastNode.id(), (int)this.node.id(), (String)"There should be one leastloadednode");
        this.time.sleep(10000L);
        this.selector.serverDisconnect(this.node.idString());
        this.client.poll(1L, this.time.milliseconds());
        Assertions.assertFalse((boolean)this.client.ready(this.node, this.time.milliseconds()), (String)"After we forced the disconnection the client is no longer ready.");
        leastLoadedNode = this.client.leastLoadedNode(this.time.milliseconds());
        Assertions.assertFalse((boolean)leastLoadedNode.hasNodeAvailableOrConnectionReady());
        Assertions.assertNull((Object)leastLoadedNode.node(), (String)"There should be NO leastloadednode");
    }

    @Test
    public void testHasNodeAvailableOrConnectionReady() {
        NetworkClient client = this.createNetworkClientWithMaxInFlightRequestsPerConnection(1, 100000L);
        this.awaitReady(client, this.node);
        long now = this.time.milliseconds();
        LeastLoadedNode leastLoadedNode = client.leastLoadedNode(now);
        Assertions.assertEquals((Object)this.node, (Object)leastLoadedNode.node());
        Assertions.assertTrue((boolean)leastLoadedNode.hasNodeAvailableOrConnectionReady());
        MetadataRequest.Builder builder = new MetadataRequest.Builder(Collections.emptyList(), true);
        ClientRequest request = client.newClientRequest(this.node.idString(), (AbstractRequest.Builder)builder, now, true);
        client.send(request, now);
        client.poll(1000L, now);
        leastLoadedNode = client.leastLoadedNode(now);
        Assertions.assertNull((Object)leastLoadedNode.node());
        Assertions.assertTrue((boolean)leastLoadedNode.hasNodeAvailableOrConnectionReady());
    }

    @Test
    public void testLeastLoadedNodeProvideDisconnectedNodesPrioritizedByLastConnectionTimestamp() {
        int nodeNumber = 3;
        NetworkClient client = this.createNetworkClientWithMultipleNodes(0L, 5000L, nodeNumber);
        HashSet<Node> providedNodeIds = new HashSet<Node>();
        for (int i = 0; i < nodeNumber * 10; ++i) {
            Node node = client.leastLoadedNode(this.time.milliseconds()).node();
            Assertions.assertNotNull((Object)node, (String)"Should provide a node");
            providedNodeIds.add(node);
            client.ready(node, this.time.milliseconds());
            client.disconnect(node.idString());
            this.time.sleep(5001L);
            client.poll(0L, this.time.milliseconds());
            if ((i + 1) % nodeNumber != 0) continue;
            Assertions.assertEquals((int)nodeNumber, (int)providedNodeIds.size(), (String)"All the nodes should be provided");
            providedNodeIds.clear();
        }
    }

    @Test
    public void testAuthenticationFailureWithInFlightMetadataRequest() {
        int refreshBackoffMs = 50;
        MetadataResponse metadataResponse = RequestTestUtils.metadataUpdateWith(2, Collections.emptyMap());
        Metadata metadata = new Metadata((long)refreshBackoffMs, (long)refreshBackoffMs, 5000L, new LogContext(), new ClusterResourceListeners());
        metadata.updateWithCurrentRequestVersion(metadataResponse, false, this.time.milliseconds());
        Cluster cluster = metadata.fetch();
        Node node1 = (Node)cluster.nodes().get(0);
        Node node2 = (Node)cluster.nodes().get(1);
        NetworkClient client = this.createNetworkClientWithNoVersionDiscovery(metadata);
        this.awaitReady(client, node1);
        metadata.requestUpdate(true);
        this.time.sleep(refreshBackoffMs);
        client.poll(0L, this.time.milliseconds());
        Optional<Node> nodeWithPendingMetadataOpt = cluster.nodes().stream().filter(node -> client.hasInFlightRequests(node.idString())).findFirst();
        Assertions.assertEquals(Optional.of(node1), nodeWithPendingMetadataOpt);
        Assertions.assertFalse((boolean)client.ready(node2, this.time.milliseconds()));
        this.selector.serverAuthenticationFailed(node2.idString());
        client.poll(0L, this.time.milliseconds());
        Assertions.assertNotNull((Object)client.authenticationException(node2));
        ByteBuffer requestBuffer = this.selector.completedSendBuffers().get(0).buffer();
        RequestHeader header = this.parseHeader(requestBuffer);
        Assertions.assertEquals((Object)ApiKeys.METADATA, (Object)header.apiKey());
        ByteBuffer responseBuffer = RequestTestUtils.serializeResponseWithHeader((AbstractResponse)metadataResponse, header.apiVersion(), header.correlationId());
        this.selector.delayedReceive(new DelayedReceive(node1.idString(), new NetworkReceive(node1.idString(), responseBuffer)));
        int initialUpdateVersion = metadata.updateVersion();
        client.poll(0L, this.time.milliseconds());
        Assertions.assertEquals((int)(initialUpdateVersion + 1), (int)metadata.updateVersion());
    }

    @Test
    public void testLeastLoadedNodeConsidersThrottledConnections() {
        this.client.ready(this.node, this.time.milliseconds());
        this.awaitReady(this.client, this.node);
        this.client.poll(1L, this.time.milliseconds());
        Assertions.assertTrue((boolean)this.client.isReady(this.node, this.time.milliseconds()), (String)"The client should be ready");
        int correlationId = this.sendEmptyProduceRequest();
        this.client.poll(1L, this.time.milliseconds());
        this.sendThrottledProduceResponse(correlationId, 100, ApiKeys.PRODUCE.latestVersion());
        this.client.poll(1L, this.time.milliseconds());
        Assertions.assertNull((Object)this.client.leastLoadedNode(this.time.milliseconds()).node());
    }

    @Test
    public void testConnectionDelayWithNoExponentialBackoff() {
        long now = this.time.milliseconds();
        long delay = this.clientWithNoExponentialBackoff.connectionDelay(this.node, now);
        Assertions.assertEquals((long)0L, (long)delay);
    }

    @Test
    public void testConnectionDelayConnectedWithNoExponentialBackoff() {
        this.awaitReady(this.clientWithNoExponentialBackoff, this.node);
        long now = this.time.milliseconds();
        long delay = this.clientWithNoExponentialBackoff.connectionDelay(this.node, now);
        Assertions.assertEquals((long)Long.MAX_VALUE, (long)delay);
    }

    @Test
    public void testConnectionDelayDisconnectedWithNoExponentialBackoff() {
        this.awaitReady(this.clientWithNoExponentialBackoff, this.node);
        this.selector.serverDisconnect(this.node.idString());
        this.clientWithNoExponentialBackoff.poll(1000L, this.time.milliseconds());
        long delay = this.clientWithNoExponentialBackoff.connectionDelay(this.node, this.time.milliseconds());
        Assertions.assertEquals((long)10000L, (long)delay);
        this.time.sleep(delay);
        Assertions.assertEquals((long)0L, (long)this.clientWithNoExponentialBackoff.connectionDelay(this.node, this.time.milliseconds()));
        this.client.ready(this.node, this.time.milliseconds());
        this.selector.serverDisconnect(this.node.idString());
        this.client.poll(1000L, this.time.milliseconds());
        Assertions.assertEquals((long)10000L, (long)delay);
    }

    @Test
    public void testConnectionDelay() {
        long now = this.time.milliseconds();
        long delay = this.client.connectionDelay(this.node, now);
        Assertions.assertEquals((long)0L, (long)delay);
    }

    @Test
    public void testConnectionDelayConnected() {
        this.awaitReady(this.client, this.node);
        long now = this.time.milliseconds();
        long delay = this.client.connectionDelay(this.node, now);
        Assertions.assertEquals((long)Long.MAX_VALUE, (long)delay);
    }

    @Test
    public void testConnectionDelayDisconnected() {
        this.awaitReady(this.client, this.node);
        this.selector.serverDisconnect(this.node.idString());
        this.client.poll(1000L, this.time.milliseconds());
        long delay = this.client.connectionDelay(this.node, this.time.milliseconds());
        long expectedDelay = 10000L;
        double jitter = 0.3;
        Assertions.assertEquals((double)expectedDelay, (double)delay, (double)((double)expectedDelay * jitter));
        this.time.sleep(delay);
        Assertions.assertEquals((long)0L, (long)this.client.connectionDelay(this.node, this.time.milliseconds()));
        this.client.ready(this.node, this.time.milliseconds());
        this.selector.serverDisconnect(this.node.idString());
        this.client.poll(1000L, this.time.milliseconds());
        expectedDelay = Math.round(delay * 2L);
        delay = this.client.connectionDelay(this.node, this.time.milliseconds());
        jitter = 0.6;
        Assertions.assertEquals((double)expectedDelay, (double)delay, (double)((double)expectedDelay * jitter));
    }

    @Test
    public void testDisconnectDuringUserMetadataRequest() {
        this.awaitReady(this.client, this.node);
        MetadataRequest.Builder builder = new MetadataRequest.Builder(Collections.emptyList(), true);
        long now = this.time.milliseconds();
        ClientRequest request = this.client.newClientRequest(this.node.idString(), (AbstractRequest.Builder)builder, now, true);
        this.client.send(request, now);
        this.client.poll(1000L, now);
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount(this.node.idString()));
        Assertions.assertTrue((boolean)this.client.hasInFlightRequests(this.node.idString()));
        Assertions.assertTrue((boolean)this.client.hasInFlightRequests());
        this.selector.close(this.node.idString());
        List responses = this.client.poll(1000L, this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)responses.size());
        Assertions.assertTrue((boolean)((ClientResponse)responses.iterator().next()).wasDisconnected());
    }

    @Test
    public void testServerDisconnectAfterInternalApiVersionRequest() throws Exception {
        long numIterations = 5L;
        double reconnectBackoffMaxExp = Math.log(100000.0 / (double)Math.max(10000L, 1L)) / Math.log(2.0);
        int i = 0;
        while ((long)i < 5L) {
            this.selector.clear();
            this.awaitInFlightApiVersionRequest();
            this.selector.serverDisconnect(this.node.idString());
            List responses = this.client.poll(0L, this.time.milliseconds());
            Assertions.assertFalse((boolean)this.client.hasInFlightRequests(this.node.idString()));
            Assertions.assertTrue((boolean)responses.isEmpty());
            long expectedBackoff = Math.round(Math.pow(2.0, Math.min((double)i, reconnectBackoffMaxExp)) * 10000.0);
            long delay = this.client.connectionDelay(this.node, this.time.milliseconds());
            Assertions.assertEquals((double)expectedBackoff, (double)delay, (double)(0.2 * (double)expectedBackoff));
            if ((long)i == 4L) break;
            this.time.sleep(delay + 1L);
            ++i;
        }
    }

    @Test
    public void testClientDisconnectAfterInternalApiVersionRequest() throws Exception {
        this.awaitInFlightApiVersionRequest();
        this.client.disconnect(this.node.idString());
        Assertions.assertFalse((boolean)this.client.hasInFlightRequests(this.node.idString()));
        List responses = this.client.poll(0L, this.time.milliseconds());
        Assertions.assertTrue((boolean)responses.isEmpty());
    }

    @Test
    public void testDisconnectWithMultipleInFlights() {
        NetworkClient client = this.clientWithNoVersionDiscovery;
        this.awaitReady(client, this.node);
        Assertions.assertTrue((boolean)client.isReady(this.node, this.time.milliseconds()), (String)("Expected NetworkClient to be ready to send to node " + this.node.idString()));
        MetadataRequest.Builder builder = new MetadataRequest.Builder(Collections.emptyList(), true);
        long now = this.time.milliseconds();
        ArrayList callbackResponses = new ArrayList();
        RequestCompletionHandler callback = callbackResponses::add;
        ClientRequest request1 = client.newClientRequest(this.node.idString(), (AbstractRequest.Builder)builder, now, true, 1000, callback);
        client.send(request1, now);
        client.poll(0L, now);
        ClientRequest request2 = client.newClientRequest(this.node.idString(), (AbstractRequest.Builder)builder, now, true, 1000, callback);
        client.send(request2, now);
        client.poll(0L, now);
        Assertions.assertNotEquals((int)request1.correlationId(), (int)request2.correlationId());
        Assertions.assertEquals((int)2, (int)client.inFlightRequestCount());
        Assertions.assertEquals((int)2, (int)client.inFlightRequestCount(this.node.idString()));
        client.disconnect(this.node.idString());
        List responses = client.poll(0L, this.time.milliseconds());
        Assertions.assertEquals((int)2, (int)responses.size());
        Assertions.assertEquals((Object)responses, callbackResponses);
        Assertions.assertEquals((int)0, (int)client.inFlightRequestCount());
        Assertions.assertEquals((int)0, (int)client.inFlightRequestCount(this.node.idString()));
        ClientResponse response1 = (ClientResponse)responses.get(0);
        Assertions.assertTrue((boolean)response1.wasDisconnected());
        Assertions.assertEquals((int)request1.correlationId(), (int)response1.requestHeader().correlationId());
        ClientResponse response2 = (ClientResponse)responses.get(1);
        Assertions.assertTrue((boolean)response2.wasDisconnected());
        Assertions.assertEquals((int)request2.correlationId(), (int)response2.requestHeader().correlationId());
    }

    @Test
    public void testCallDisconnect() {
        this.awaitReady(this.client, this.node);
        Assertions.assertTrue((boolean)this.client.isReady(this.node, this.time.milliseconds()), (String)("Expected NetworkClient to be ready to send to node " + this.node.idString()));
        Assertions.assertFalse((boolean)this.client.connectionFailed(this.node), (String)("Did not expect connection to node " + this.node.idString() + " to be failed"));
        this.client.disconnect(this.node.idString());
        Assertions.assertFalse((boolean)this.client.isReady(this.node, this.time.milliseconds()), (String)("Expected node " + this.node.idString() + " to be disconnected."));
        Assertions.assertTrue((boolean)this.client.connectionFailed(this.node), (String)("Expected connection to node " + this.node.idString() + " to be failed after disconnect"));
        Assertions.assertFalse((boolean)this.client.canConnect(this.node, this.time.milliseconds()));
        this.time.sleep(100000L);
        Assertions.assertTrue((boolean)this.client.canConnect(this.node, this.time.milliseconds()));
        this.client.disconnect(this.node.idString());
        Assertions.assertTrue((boolean)this.client.canConnect(this.node, this.time.milliseconds()));
    }

    @Test
    public void testCorrelationId() {
        int count = 100;
        Set<Integer> ids = IntStream.range(0, count).mapToObj(i -> this.client.nextCorrelationId()).collect(Collectors.toSet());
        Assertions.assertEquals((int)count, (int)ids.size());
        ids.forEach(id -> Assertions.assertTrue((id < 0x7FFFFFF8 ? 1 : 0) != 0));
    }

    @Test
    public void testReconnectAfterAddressChange() {
        AddressChangeHostResolver mockHostResolver = new AddressChangeHostResolver(initialAddresses.toArray(new InetAddress[0]), newAddresses.toArray(new InetAddress[0]));
        AtomicInteger initialAddressConns = new AtomicInteger();
        AtomicInteger newAddressConns = new AtomicInteger();
        MockSelector selector = new MockSelector(this.time, inetSocketAddress -> {
            InetAddress inetAddress = inetSocketAddress.getAddress();
            if (initialAddresses.contains(inetAddress)) {
                initialAddressConns.incrementAndGet();
            } else if (newAddresses.contains(inetAddress)) {
                newAddressConns.incrementAndGet();
            }
            return mockHostResolver.useNewAddresses() && newAddresses.contains(inetAddress) || !mockHostResolver.useNewAddresses() && initialAddresses.contains(inetAddress);
        });
        ClientTelemetrySender mockClientTelemetrySender = (ClientTelemetrySender)Mockito.mock(ClientTelemetrySender.class);
        Mockito.when((Object)mockClientTelemetrySender.timeToNextUpdate(ArgumentMatchers.anyLong())).thenReturn((Object)0L);
        NetworkClient client = new NetworkClient((MetadataUpdater)this.metadataUpdater, null, (Selectable)selector, "mock", Integer.MAX_VALUE, 10000L, 100000L, 65536, 65536, 1000, 5000L, 127000L, (Time)this.time, false, new ApiVersions(), null, new LogContext(), (HostResolver)mockHostResolver, mockClientTelemetrySender, MetadataRecoveryStrategy.NONE);
        client.ready(this.node, this.time.milliseconds());
        this.time.sleep(127000L);
        client.poll(0L, this.time.milliseconds());
        Assertions.assertTrue((boolean)client.isReady(this.node, this.time.milliseconds()));
        Assertions.assertNull((Object)client.telemetryConnectedNode());
        client.poll(0L, this.time.milliseconds());
        Assertions.assertEquals((Object)this.node, (Object)client.telemetryConnectedNode());
        mockHostResolver.changeAddresses();
        selector.serverDisconnect(this.node.idString());
        client.poll(0L, this.time.milliseconds());
        Assertions.assertFalse((boolean)client.isReady(this.node, this.time.milliseconds()));
        Assertions.assertNull((Object)client.telemetryConnectedNode());
        this.time.sleep(100000L);
        client.ready(this.node, this.time.milliseconds());
        this.time.sleep(127000L);
        client.poll(0L, this.time.milliseconds());
        Assertions.assertTrue((boolean)client.isReady(this.node, this.time.milliseconds()));
        Assertions.assertNull((Object)client.telemetryConnectedNode());
        client.poll(0L, this.time.milliseconds());
        Assertions.assertEquals((Object)this.node, (Object)client.telemetryConnectedNode());
        Assertions.assertEquals((int)1, (int)initialAddressConns.get());
        Assertions.assertEquals((int)1, (int)newAddressConns.get());
        Assertions.assertEquals((int)2, (int)mockHostResolver.resolutionCount());
        ((ClientTelemetrySender)Mockito.verify((Object)mockClientTelemetrySender, (VerificationMode)Mockito.times((int)5))).timeToNextUpdate(ArgumentMatchers.anyLong());
    }

    @Test
    public void testFailedConnectionToFirstAddress() {
        AddressChangeHostResolver mockHostResolver = new AddressChangeHostResolver(initialAddresses.toArray(new InetAddress[0]), newAddresses.toArray(new InetAddress[0]));
        AtomicInteger initialAddressConns = new AtomicInteger();
        AtomicInteger newAddressConns = new AtomicInteger();
        MockSelector selector = new MockSelector(this.time, inetSocketAddress -> {
            InetAddress inetAddress = inetSocketAddress.getAddress();
            if (initialAddresses.contains(inetAddress)) {
                initialAddressConns.incrementAndGet();
            } else if (newAddresses.contains(inetAddress)) {
                newAddressConns.incrementAndGet();
            }
            return initialAddressConns.get() > 1;
        });
        ClientTelemetrySender mockClientTelemetrySender = (ClientTelemetrySender)Mockito.mock(ClientTelemetrySender.class);
        Mockito.when((Object)mockClientTelemetrySender.timeToNextUpdate(ArgumentMatchers.anyLong())).thenReturn((Object)0L);
        NetworkClient client = new NetworkClient((MetadataUpdater)this.metadataUpdater, null, (Selectable)selector, "mock", Integer.MAX_VALUE, 10000L, 100000L, 65536, 65536, 1000, 5000L, 127000L, (Time)this.time, false, new ApiVersions(), null, new LogContext(), (HostResolver)mockHostResolver, mockClientTelemetrySender, MetadataRecoveryStrategy.NONE);
        client.ready(this.node, this.time.milliseconds());
        this.time.sleep(127000L);
        client.poll(0L, this.time.milliseconds());
        Assertions.assertFalse((boolean)client.isReady(this.node, this.time.milliseconds()));
        Assertions.assertNull((Object)client.telemetryConnectedNode());
        this.time.sleep(100000L);
        client.ready(this.node, this.time.milliseconds());
        this.time.sleep(127000L);
        client.poll(0L, this.time.milliseconds());
        Assertions.assertTrue((boolean)client.isReady(this.node, this.time.milliseconds()));
        Assertions.assertNull((Object)client.telemetryConnectedNode());
        client.poll(0L, this.time.milliseconds());
        Assertions.assertEquals((Object)this.node, (Object)client.telemetryConnectedNode());
        Assertions.assertEquals((int)2, (int)initialAddressConns.get());
        Assertions.assertEquals((int)0, (int)newAddressConns.get());
        Assertions.assertEquals((int)1, (int)mockHostResolver.resolutionCount());
        ((ClientTelemetrySender)Mockito.verify((Object)mockClientTelemetrySender, (VerificationMode)Mockito.times((int)3))).timeToNextUpdate(ArgumentMatchers.anyLong());
    }

    @Test
    public void testFailedConnectionToFirstAddressAfterReconnect() {
        AddressChangeHostResolver mockHostResolver = new AddressChangeHostResolver(initialAddresses.toArray(new InetAddress[0]), newAddresses.toArray(new InetAddress[0]));
        AtomicInteger initialAddressConns = new AtomicInteger();
        AtomicInteger newAddressConns = new AtomicInteger();
        MockSelector selector = new MockSelector(this.time, inetSocketAddress -> {
            InetAddress inetAddress = inetSocketAddress.getAddress();
            if (initialAddresses.contains(inetAddress)) {
                initialAddressConns.incrementAndGet();
            } else if (newAddresses.contains(inetAddress)) {
                newAddressConns.incrementAndGet();
            }
            return initialAddresses.contains(inetAddress) || newAddressConns.get() > 1;
        });
        ClientTelemetrySender mockClientTelemetrySender = (ClientTelemetrySender)Mockito.mock(ClientTelemetrySender.class);
        Mockito.when((Object)mockClientTelemetrySender.timeToNextUpdate(ArgumentMatchers.anyLong())).thenReturn((Object)0L);
        NetworkClient client = new NetworkClient((MetadataUpdater)this.metadataUpdater, null, (Selectable)selector, "mock", Integer.MAX_VALUE, 10000L, 100000L, 65536, 65536, 1000, 5000L, 127000L, (Time)this.time, false, new ApiVersions(), null, new LogContext(), (HostResolver)mockHostResolver, mockClientTelemetrySender, MetadataRecoveryStrategy.NONE);
        client.ready(this.node, this.time.milliseconds());
        this.time.sleep(127000L);
        client.poll(0L, this.time.milliseconds());
        Assertions.assertTrue((boolean)client.isReady(this.node, this.time.milliseconds()));
        Assertions.assertNull((Object)client.telemetryConnectedNode());
        client.poll(0L, this.time.milliseconds());
        Assertions.assertEquals((Object)this.node, (Object)client.telemetryConnectedNode());
        mockHostResolver.changeAddresses();
        selector.serverDisconnect(this.node.idString());
        client.poll(0L, this.time.milliseconds());
        Assertions.assertFalse((boolean)client.isReady(this.node, this.time.milliseconds()));
        Assertions.assertNull((Object)client.telemetryConnectedNode());
        this.time.sleep(100000L);
        client.ready(this.node, this.time.milliseconds());
        this.time.sleep(127000L);
        client.poll(0L, this.time.milliseconds());
        Assertions.assertFalse((boolean)client.isReady(this.node, this.time.milliseconds()));
        Assertions.assertNull((Object)client.telemetryConnectedNode());
        this.time.sleep(100000L);
        client.ready(this.node, this.time.milliseconds());
        this.time.sleep(127000L);
        client.poll(0L, this.time.milliseconds());
        Assertions.assertTrue((boolean)client.isReady(this.node, this.time.milliseconds()));
        Assertions.assertNull((Object)client.telemetryConnectedNode());
        client.poll(0L, this.time.milliseconds());
        Assertions.assertEquals((Object)this.node, (Object)client.telemetryConnectedNode());
        Assertions.assertEquals((int)1, (int)initialAddressConns.get());
        Assertions.assertEquals((int)2, (int)newAddressConns.get());
        Assertions.assertEquals((int)2, (int)mockHostResolver.resolutionCount());
        ((ClientTelemetrySender)Mockito.verify((Object)mockClientTelemetrySender, (VerificationMode)Mockito.times((int)6))).timeToNextUpdate(ArgumentMatchers.anyLong());
    }

    @Test
    public void testCloseConnectingNode() {
        Cluster cluster = TestUtils.clusterWith(2);
        Node node0 = cluster.nodeById(0);
        Node node1 = cluster.nodeById(1);
        this.client.ready(node0, this.time.milliseconds());
        this.selector.serverConnectionBlocked(node0.idString());
        this.client.poll(1L, this.time.milliseconds());
        this.client.close(node0.idString());
        this.client.poll(0L, this.time.milliseconds());
        Assertions.assertFalse((boolean)NetworkClientUtils.isReady((KafkaClient)this.client, (Node)node0, (long)this.time.milliseconds()));
        Assertions.assertFalse((boolean)NetworkClientUtils.isReady((KafkaClient)this.client, (Node)node1, (long)this.time.milliseconds()));
        this.client.ready(node1, this.time.milliseconds());
        ByteBuffer buffer = RequestTestUtils.serializeResponseWithHeader((AbstractResponse)this.defaultApiVersionsResponse(), ApiKeys.API_VERSIONS.latestVersion(), 0);
        this.selector.delayedReceive(new DelayedReceive(node1.idString(), new NetworkReceive(node1.idString(), buffer)));
        while (!this.client.ready(node1, this.time.milliseconds())) {
            this.client.poll(1L, this.time.milliseconds());
        }
        Assertions.assertTrue((boolean)this.client.isReady(node1, this.time.milliseconds()));
        this.selector.clear();
        this.client.ready(node0, this.time.milliseconds());
        buffer = RequestTestUtils.serializeResponseWithHeader((AbstractResponse)this.defaultApiVersionsResponse(), ApiKeys.API_VERSIONS.latestVersion(), 1);
        this.selector.delayedReceive(new DelayedReceive(node0.idString(), new NetworkReceive(node0.idString(), buffer)));
        while (!this.client.ready(node0, this.time.milliseconds())) {
            this.client.poll(1L, this.time.milliseconds());
        }
        Assertions.assertTrue((boolean)this.client.isReady(node0, this.time.milliseconds()));
    }

    @Test
    public void testConnectionDoesNotRemainStuckInCheckingApiVersionsStateIfChannelNeverBecomesReady() {
        Cluster cluster = TestUtils.clusterWith(1);
        Node node = cluster.nodeById(0);
        this.client.ready(node, this.time.milliseconds());
        this.selector.channelNotReady(node.idString());
        this.client.poll(0L, this.time.milliseconds());
        Assertions.assertFalse((boolean)NetworkClientUtils.isReady((KafkaClient)this.client, (Node)node, (long)this.time.milliseconds()));
        this.time.sleep(6001L);
        this.client.poll(0L, this.time.milliseconds());
        Assertions.assertTrue((boolean)this.client.connectionFailed(node));
    }

    @Test
    public void testTelemetryRequest() {
        ClientTelemetrySender mockClientTelemetrySender = (ClientTelemetrySender)Mockito.mock(ClientTelemetrySender.class);
        Mockito.when((Object)mockClientTelemetrySender.timeToNextUpdate(ArgumentMatchers.anyLong())).thenReturn((Object)0L);
        NetworkClient client = new NetworkClient((MetadataUpdater)this.metadataUpdater, null, (Selectable)this.selector, "mock", Integer.MAX_VALUE, 10000L, 100000L, 65536, 65536, 1000, 5000L, 127000L, (Time)this.time, true, new ApiVersions(), null, new LogContext(), (HostResolver)new DefaultHostResolver(), mockClientTelemetrySender, MetadataRecoveryStrategy.NONE);
        client.ready(this.node, this.time.milliseconds());
        client.poll(0L, this.time.milliseconds());
        Assertions.assertNull((Object)client.telemetryConnectedNode());
        Assertions.assertTrue((boolean)client.hasInFlightRequests(this.node.idString()));
        this.delayedApiVersionsResponse(0, ApiKeys.API_VERSIONS.latestVersion(), TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.BROKER));
        client.poll(0L, this.time.milliseconds());
        Assertions.assertFalse((boolean)client.hasInFlightRequests(this.node.idString()));
        this.selector.clear();
        GetTelemetrySubscriptionsRequest.Builder getRequest = new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true);
        Mockito.when((Object)mockClientTelemetrySender.createRequest()).thenReturn(Optional.of(getRequest));
        GetTelemetrySubscriptionsResponse getResponse = new GetTelemetrySubscriptionsResponse(new GetTelemetrySubscriptionsResponseData());
        ByteBuffer buffer = RequestTestUtils.serializeResponseWithHeader((AbstractResponse)getResponse, ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS.latestVersion(), 1);
        this.selector.completeReceive(new NetworkReceive(this.node.idString(), buffer));
        client.poll(0L, this.time.milliseconds());
        Assertions.assertTrue((boolean)client.isReady(this.node, this.time.milliseconds()));
        Assertions.assertEquals((Object)this.node, (Object)client.telemetryConnectedNode());
        ((ClientTelemetrySender)Mockito.verify((Object)mockClientTelemetrySender, (VerificationMode)Mockito.times((int)1))).handleResponse((GetTelemetrySubscriptionsResponse)ArgumentMatchers.any(GetTelemetrySubscriptionsResponse.class));
        this.selector.clear();
        PushTelemetryRequest.Builder pushRequest = new PushTelemetryRequest.Builder(new PushTelemetryRequestData(), true);
        Mockito.when((Object)mockClientTelemetrySender.createRequest()).thenReturn(Optional.of(pushRequest));
        PushTelemetryResponse pushResponse = new PushTelemetryResponse(new PushTelemetryResponseData());
        ByteBuffer pushBuffer = RequestTestUtils.serializeResponseWithHeader((AbstractResponse)pushResponse, ApiKeys.PUSH_TELEMETRY.latestVersion(), 2);
        this.selector.completeReceive(new NetworkReceive(this.node.idString(), pushBuffer));
        client.poll(0L, this.time.milliseconds());
        Assertions.assertTrue((boolean)client.isReady(this.node, this.time.milliseconds()));
        Assertions.assertEquals((Object)this.node, (Object)client.telemetryConnectedNode());
        ((ClientTelemetrySender)Mockito.verify((Object)mockClientTelemetrySender, (VerificationMode)Mockito.times((int)1))).handleResponse((PushTelemetryResponse)ArgumentMatchers.any(PushTelemetryResponse.class));
        ((ClientTelemetrySender)Mockito.verify((Object)mockClientTelemetrySender, (VerificationMode)Mockito.times((int)4))).timeToNextUpdate(ArgumentMatchers.anyLong());
        ((ClientTelemetrySender)Mockito.verify((Object)mockClientTelemetrySender, (VerificationMode)Mockito.times((int)2))).createRequest();
    }

    private RequestHeader parseHeader(ByteBuffer buffer) {
        buffer.getInt();
        return RequestHeader.parse((ByteBuffer)buffer.slice());
    }

    private void awaitInFlightApiVersionRequest() throws Exception {
        this.client.ready(this.node, this.time.milliseconds());
        TestUtils.waitForCondition(() -> {
            this.client.poll(0L, this.time.milliseconds());
            return this.client.hasInFlightRequests(this.node.idString());
        }, 1000L, "");
        Assertions.assertFalse((boolean)this.client.isReady(this.node, this.time.milliseconds()));
    }

    private ApiVersionsResponse defaultApiVersionsResponse() {
        return TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER);
    }

    static {
        try {
            initialAddresses = new ArrayList<InetAddress>(Arrays.asList(InetAddress.getByName("10.200.20.100"), InetAddress.getByName("10.200.20.101"), InetAddress.getByName("10.200.20.102")));
            newAddresses = new ArrayList<InetAddress>(Arrays.asList(InetAddress.getByName("10.200.20.103"), InetAddress.getByName("10.200.20.104"), InetAddress.getByName("10.200.20.105")));
        }
        catch (UnknownHostException e) {
            Assertions.fail((String)"Attempted to create an invalid InetAddress, this should not happen");
        }
    }

    private static class TestMetadataUpdater
    extends ManualMetadataUpdater {
        KafkaException failure;

        public TestMetadataUpdater(List<Node> nodes) {
            super(nodes);
        }

        public void handleServerDisconnect(long now, String destinationId, Optional<AuthenticationException> maybeAuthException) {
            maybeAuthException.ifPresent(exception -> {
                this.failure = exception;
            });
            super.handleServerDisconnect(now, destinationId, maybeAuthException);
        }

        public void handleFailedRequest(long now, Optional<KafkaException> maybeFatalException) {
            maybeFatalException.ifPresent(exception -> {
                this.failure = exception;
            });
        }

        public KafkaException getAndClearFailure() {
            KafkaException failure = this.failure;
            this.failure = null;
            return failure;
        }
    }

    private static class TestCallbackHandler
    implements RequestCompletionHandler {
        public boolean executed = false;
        public ClientResponse response;

        private TestCallbackHandler() {
        }

        public void onComplete(ClientResponse response) {
            this.executed = true;
            this.response = response;
        }
    }
}

