package org.apache.flink.runtime.registration;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import org.apache.flink.runtime.registration.RetryingRegistrationTest;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.types.Either;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.class */
class RegisteredRpcConnectionTest {
    private TestingRpcService rpcService;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/registration/RegisteredRpcConnectionTest$TestRpcConnection.class */
    public static class TestRpcConnection extends RegisteredRpcConnection<UUID, TestRegistrationGateway, RetryingRegistrationTest.TestRegistrationSuccess, RetryingRegistrationTest.TestRegistrationRejection> {
        private final Object lock;
        private final RpcService rpcService;
        private CompletableFuture<Either<RetryingRegistrationTest.TestRegistrationSuccess, RetryingRegistrationTest.TestRegistrationRejection>> connectionFuture;

        public TestRpcConnection(String str, UUID uuid, Executor executor, RpcService rpcService) {
            super(LoggerFactory.getLogger(RegisteredRpcConnectionTest.class), str, uuid, executor);
            this.lock = new Object();
            this.rpcService = rpcService;
            this.connectionFuture = new CompletableFuture<>();
        }

        protected RetryingRegistration<UUID, TestRegistrationGateway, RetryingRegistrationTest.TestRegistrationSuccess, RetryingRegistrationTest.TestRegistrationRejection> generateRegistration() {
            return new RetryingRegistrationTest.TestRetryingRegistration(this.rpcService, getTargetAddress(), (UUID) getTargetLeaderId());
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void onRegistrationSuccess(RetryingRegistrationTest.TestRegistrationSuccess testRegistrationSuccess) {
            synchronized (this.lock) {
                this.connectionFuture.complete(Either.Left(testRegistrationSuccess));
            }
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void onRegistrationRejection(RetryingRegistrationTest.TestRegistrationRejection testRegistrationRejection) {
            synchronized (this.lock) {
                this.connectionFuture.complete(Either.Right(testRegistrationRejection));
            }
        }

        protected void onRegistrationFailure(Throwable th) {
            synchronized (this.lock) {
                this.connectionFuture.completeExceptionally(th);
            }
        }

        public boolean tryReconnect() {
            synchronized (this.lock) {
                this.connectionFuture.cancel(false);
                this.connectionFuture = new CompletableFuture<>();
            }
            return super.tryReconnect();
        }

        public CompletableFuture<Either<RetryingRegistrationTest.TestRegistrationSuccess, RetryingRegistrationTest.TestRegistrationRejection>> getConnectionFuture() {
            CompletableFuture<Either<RetryingRegistrationTest.TestRegistrationSuccess, RetryingRegistrationTest.TestRegistrationRejection>> completableFuture;
            synchronized (this.lock) {
                completableFuture = this.connectionFuture;
            }
            return completableFuture;
        }
    }

    RegisteredRpcConnectionTest() {
    }

    @BeforeEach
    void setup() {
        this.rpcService = new TestingRpcService();
    }

    @AfterEach
    void tearDown() throws ExecutionException, InterruptedException {
        if (this.rpcService != null) {
            this.rpcService.closeAsync().get();
        }
    }

    @Test
    void testSuccessfulRpcConnection() throws Exception {
        UUID randomUUID = UUID.randomUUID();
        ManualResponseTestRegistrationGateway manualResponseTestRegistrationGateway = new ManualResponseTestRegistrationGateway(new RetryingRegistrationTest.TestRegistrationSuccess("Test RPC Connection ID"));
        try {
            this.rpcService.registerGateway("<TestRpcConnectionEndpointAddress>", manualResponseTestRegistrationGateway);
            TestRpcConnection testRpcConnection = new TestRpcConnection("<TestRpcConnectionEndpointAddress>", randomUUID, this.rpcService.getScheduledExecutor(), this.rpcService);
            testRpcConnection.start();
            Either<RetryingRegistrationTest.TestRegistrationSuccess, RetryingRegistrationTest.TestRegistrationRejection> either = testRpcConnection.getConnectionFuture().get();
            Assertions.assertThat(either.isLeft()).isTrue();
            String correlationId = ((RetryingRegistrationTest.TestRegistrationSuccess) either.left()).getCorrelationId();
            Assertions.assertThat(testRpcConnection.isConnected()).isTrue();
            Assertions.assertThat(testRpcConnection.getTargetAddress()).isEqualTo("<TestRpcConnectionEndpointAddress>");
            Assertions.assertThat((UUID) testRpcConnection.getTargetLeaderId()).isEqualTo(randomUUID);
            Assertions.assertThat((TestRegistrationGateway) testRpcConnection.getTargetGateway()).isEqualTo(manualResponseTestRegistrationGateway);
            Assertions.assertThat(correlationId).isEqualTo("Test RPC Connection ID");
            manualResponseTestRegistrationGateway.stop();
        } catch (Throwable th) {
            manualResponseTestRegistrationGateway.stop();
            throw th;
        }
    }

    @Test
    void testRpcConnectionFailures() throws Exception {
        UUID randomUUID = UUID.randomUUID();
        RuntimeException runtimeException = new RuntimeException("Test RPC Connection failure");
        this.rpcService.registerGateway("<TestRpcConnectionEndpointAddress>", DefaultTestRegistrationGateway.newBuilder().setRegistrationFunction((uuid, l) -> {
            throw runtimeException;
        }).build());
        TestRpcConnection testRpcConnection = new TestRpcConnection("<TestRpcConnectionEndpointAddress>", randomUUID, this.rpcService.getScheduledExecutor(), this.rpcService);
        testRpcConnection.start();
        Assertions.assertThatThrownBy(() -> {
            testRpcConnection.getConnectionFuture().get();
        }).withFailMessage("expected failure.", new Object[0]).isInstanceOf(ExecutionException.class).hasCause(runtimeException);
        Assertions.assertThat(testRpcConnection.isConnected()).isFalse();
        Assertions.assertThat(testRpcConnection.getTargetAddress()).isEqualTo("<TestRpcConnectionEndpointAddress>");
        Assertions.assertThat((UUID) testRpcConnection.getTargetLeaderId()).isEqualTo(randomUUID);
        Assertions.assertThat((TestRegistrationGateway) testRpcConnection.getTargetGateway()).isNull();
    }

    @Test
    void testRpcConnectionRejectionCallsOnRegistrationRejection() {
        DefaultTestRegistrationGateway build = DefaultTestRegistrationGateway.newBuilder().setRegistrationFunction((uuid, l) -> {
            return CompletableFuture.completedFuture(new RetryingRegistrationTest.TestRegistrationRejection(RetryingRegistrationTest.TestRegistrationRejection.RejectionReason.REJECTED));
        }).build();
        this.rpcService.registerGateway(build.getAddress(), build);
        TestRpcConnection testRpcConnection = new TestRpcConnection(build.getAddress(), UUID.randomUUID(), this.rpcService.getScheduledExecutor(), this.rpcService);
        testRpcConnection.start();
        Either<RetryingRegistrationTest.TestRegistrationSuccess, RetryingRegistrationTest.TestRegistrationRejection> join = testRpcConnection.getConnectionFuture().join();
        Assertions.assertThat(join.isRight()).isTrue();
        Assertions.assertThat(((RetryingRegistrationTest.TestRegistrationRejection) join.right()).getRejectionReason()).isEqualTo(RetryingRegistrationTest.TestRegistrationRejection.RejectionReason.REJECTED);
    }

    @Test
    void testRpcConnectionClose() throws Exception {
        UUID randomUUID = UUID.randomUUID();
        ManualResponseTestRegistrationGateway manualResponseTestRegistrationGateway = new ManualResponseTestRegistrationGateway(new RetryingRegistrationTest.TestRegistrationSuccess("Test RPC Connection ID"));
        try {
            this.rpcService.registerGateway("<TestRpcConnectionEndpointAddress>", manualResponseTestRegistrationGateway);
            TestRpcConnection testRpcConnection = new TestRpcConnection("<TestRpcConnectionEndpointAddress>", randomUUID, this.rpcService.getScheduledExecutor(), this.rpcService);
            testRpcConnection.start();
            testRpcConnection.close();
            Assertions.assertThat(testRpcConnection.getTargetAddress()).isEqualTo("<TestRpcConnectionEndpointAddress>");
            Assertions.assertThat((UUID) testRpcConnection.getTargetLeaderId()).isEqualTo(randomUUID);
            Assertions.assertThat(testRpcConnection.isClosed()).isTrue();
            manualResponseTestRegistrationGateway.stop();
        } catch (Throwable th) {
            manualResponseTestRegistrationGateway.stop();
            throw th;
        }
    }

    @Test
    void testReconnect() throws Exception {
        UUID randomUUID = UUID.randomUUID();
        this.rpcService.registerGateway("<TestRpcConnectionEndpointAddress>", new ManualResponseTestRegistrationGateway(new RetryingRegistrationTest.TestRegistrationSuccess("Test RPC Connection ID 1"), new RetryingRegistrationTest.TestRegistrationSuccess("Test RPC Connection ID 2")));
        TestRpcConnection testRpcConnection = new TestRpcConnection("<TestRpcConnectionEndpointAddress>", randomUUID, this.rpcService.getScheduledExecutor(), this.rpcService);
        testRpcConnection.start();
        Either<RetryingRegistrationTest.TestRegistrationSuccess, RetryingRegistrationTest.TestRegistrationRejection> either = testRpcConnection.getConnectionFuture().get();
        Assertions.assertThat(either.isLeft()).isTrue();
        Assertions.assertThat(((RetryingRegistrationTest.TestRegistrationSuccess) either.left()).getCorrelationId()).isEqualTo("Test RPC Connection ID 1");
        Assertions.assertThat(testRpcConnection.tryReconnect()).isTrue();
        Either<RetryingRegistrationTest.TestRegistrationSuccess, RetryingRegistrationTest.TestRegistrationRejection> either2 = testRpcConnection.getConnectionFuture().get();
        Assertions.assertThat(either2.isLeft()).isTrue();
        Assertions.assertThat(((RetryingRegistrationTest.TestRegistrationSuccess) either2.left()).getCorrelationId()).isEqualTo("Test RPC Connection ID 2");
    }
}
