package org.apache.kafka.clients;

import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.common.message.ReverseConnectionRequestData;
import org.apache.kafka.common.message.ReverseConnectionResponseData;
import org.apache.kafka.common.network.KafkaChannel;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.ReverseNode;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.ReverseConnectionResponse;
import org.apache.kafka.common.security.auth.AuthenticationContext;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/clients/SourceReverseConnectionManagerTest.class */
public class SourceReverseConnectionManagerTest implements ReverseNode.ReverseCallback {
    private final MockTime time = new MockTime();
    private NetworkClient networkClient;
    private Selector selector;
    private SourceReverseConnectionManager connManager;
    private Node remoteNode;
    private int metadataUpdatesRequested;

    @BeforeEach
    public void setUp() {
        Uuid randomUuid = Uuid.randomUuid();
        this.remoteNode = new Node(1, "localhost", 9092);
        this.networkClient = (NetworkClient) Mockito.mock(NetworkClient.class);
        this.selector = (Selector) Mockito.mock(Selector.class);
        this.connManager = new SourceReverseConnectionManager(this.networkClient, this.selector, new ManualMetadataUpdater(Collections.singletonList(this.remoteNode)) { // from class: org.apache.kafka.clients.SourceReverseConnectionManagerTest.1
            public long maybeUpdate(long j) {
                SourceReverseConnectionManagerTest.access$008(SourceReverseConnectionManagerTest.this);
                return super.maybeUpdate(j);
            }
        }, randomUuid, new ReverseConnectionRequestData().setClusterLinkId(new Uuid(randomUuid.getMostSignificantBits(), randomUuid.getLeastSignificantBits())).setSourceClusterId("sourceCluster").setTargetClusterId("targetCluster"), this, new LogContext());
    }

    @Test
    public void createReversibleConnection() {
        createReversibleConnection(123, this.remoteNode.id());
        Assertions.assertEquals(0, this.metadataUpdatesRequested);
        ((NetworkClient) Mockito.verify(this.networkClient, Mockito.times(1))).wakeup();
        Assertions.assertEquals(1, this.connManager.reverseNodes().size());
    }

    @Test
    public void createMultipleReversibleConnectionsToSameBroker() {
        createReversibleConnection(12, this.remoteNode.id());
        createReversibleConnection(13, this.remoteNode.id());
        Assertions.assertEquals(2, this.connManager.reverseNodes().size());
        Assertions.assertEquals(Utils.mkSet(new Integer[]{-1073741823, -1073741822}), this.connManager.reverseNodes().stream().map((v0) -> {
            return v0.id();
        }).collect(Collectors.toSet()));
    }

    @Test
    public void reverseConnectionDisconnect() {
        ReverseNode createReversibleConnection = createReversibleConnection(123, this.remoteNode.id());
        this.connManager.processDisconnection(createReversibleConnection.idString());
        Assertions.assertEquals(Collections.emptyList(), this.connManager.reverseNodes());
        Assertions.assertEquals(NetworkException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
        })).getCause().getClass());
    }

    @Test
    public void reverseRequestBrokerNotAvailable() {
        Assertions.assertThrows(NetworkException.class, () -> {
            this.connManager.createReversibleConnection(123, 5, ListenerName.forSecurityProtocol(SecurityProtocol.SSL), KafkaPrincipal.ANONYMOUS, Optional.empty(), (AuthenticationContext) null, 0L);
        });
        Assertions.assertEquals(0, this.metadataUpdatesRequested);
    }

    @Test
    public void apiVersionsWithoutReversal() {
        this.connManager.handleApiVersionsResponse(this.remoteNode.idString(), new ApiVersionsResponse(new ApiVersionsResponseData().setErrorCode(Errors.NONE.code())));
        Assertions.assertEquals(Collections.emptyList(), this.connManager.reverseNodes());
        Assertions.assertEquals(0, this.metadataUpdatesRequested);
    }

    @Test
    public void reverseRequestNotSupported() {
        ReverseNode createReversibleConnection = createReversibleConnection(123, this.remoteNode.id());
        ApiVersionsResponse apiVersionsResponse = new ApiVersionsResponse(new ApiVersionsResponseData().setErrorCode(Errors.NONE.code()));
        Assertions.assertThrows(UnsupportedVersionException.class, () -> {
            this.connManager.handleApiVersionsResponse(createReversibleConnection.idString(), apiVersionsResponse);
        });
        Assertions.assertEquals(0, this.metadataUpdatesRequested);
    }

    @Test
    public void apiVersionsWithReversal() {
        ReverseNode createReversibleConnection = createReversibleConnection(123, this.remoteNode.id());
        this.connManager.handleApiVersionsResponse(createReversibleConnection.idString(), apiVersionsResponse());
        Assertions.assertEquals(Optional.of(123), ((ReverseNode) this.connManager.reverseNodes().iterator().next()).requestId());
        long milliseconds = this.time.milliseconds();
        this.connManager.handleReverseConnectionsRequests(milliseconds);
        Assertions.assertEquals(0, this.metadataUpdatesRequested);
        Assertions.assertEquals(Optional.of(123), ((ReverseNode) this.connManager.reverseNodes().iterator().next()).requestId());
        ((NetworkClient) Mockito.verify(this.networkClient, Mockito.times(1))).ready(createReversibleConnection, milliseconds);
    }

    @Test
    public void apiVersionsFailure() {
        this.connManager.handleApiVersionsResponse(this.remoteNode.idString(), new ApiVersionsResponse(new ApiVersionsResponseData().setErrorCode(Errors.INVALID_REQUEST.code())));
        Assertions.assertEquals(Collections.emptyList(), this.connManager.reverseNodes());
        Assertions.assertEquals(0, this.metadataUpdatesRequested);
    }

    @Test
    public void reverseConnectionResponse() {
        ReverseNode createReversibleConnection = createReversibleConnection(123, this.remoteNode.id());
        this.connManager.handleApiVersionsResponse(createReversibleConnection.idString(), apiVersionsResponse());
        ReverseConnectionResponse reverseConnectionResponse = new ReverseConnectionResponse(new ReverseConnectionResponseData().setErrorCode(Errors.NONE.code()));
        KafkaChannel kafkaChannel = (KafkaChannel) Mockito.mock(KafkaChannel.class);
        Mockito.when(this.selector.channel(createReversibleConnection.idString())).thenReturn(kafkaChannel);
        this.connManager.handleReverseConnectionResponse(createReversibleConnection.idString(), reverseConnectionResponse);
        ((Selector) Mockito.verify(this.selector, Mockito.times(1))).removeChannelWithoutClosing(kafkaChannel);
        Assertions.assertEquals(Collections.emptyList(), this.connManager.reverseNodes());
    }

    public void onReverseConnection(KafkaChannel kafkaChannel, ReverseNode reverseNode) {
    }

    private ReverseNode createReversibleConnection(int i, int i2) {
        this.connManager.createReversibleConnection(i, i2, ListenerName.forSecurityProtocol(SecurityProtocol.SSL), KafkaPrincipal.ANONYMOUS, Optional.empty(), (AuthenticationContext) null, this.time.milliseconds());
        Optional findFirst = this.connManager.reverseNodes().stream().filter(reverseNode -> {
            return reverseNode.requestId().equals(Optional.of(Integer.valueOf(i)));
        }).findFirst();
        Assertions.assertTrue(findFirst.isPresent());
        return (ReverseNode) findFirst.get();
    }

    private ApiVersionsResponse apiVersionsResponse() {
        return new ApiVersionsResponse(new ApiVersionsResponseData().setApiKeys(new ApiVersionsResponseData.ApiVersionCollection(Collections.singleton(new ApiVersionsResponseData.ApiVersion().setApiKey(ApiKeys.REVERSE_CONNECTION.id).setMaxVersion((short) 0)).iterator())).setErrorCode(Errors.NONE.code()));
    }

    static /* synthetic */ int access$008(SourceReverseConnectionManagerTest sourceReverseConnectionManagerTest) {
        int i = sourceReverseConnectionManagerTest.metadataUpdatesRequested;
        sourceReverseConnectionManagerTest.metadataUpdatesRequested = i + 1;
        return i;
    }
}
