/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.registration;

import java.io.Serializable;
import java.util.ArrayDeque;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.runtime.registration.DefaultTestRegistrationGateway;
import org.apache.flink.runtime.registration.ManualResponseTestRegistrationGateway;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.registration.RetryingRegistration;
import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration;
import org.apache.flink.runtime.registration.TestRegistrationGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.Mockito;
import org.slf4j.LoggerFactory;

class RetryingRegistrationTest {
    @RegisterExtension
    public static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION = TestingUtils.defaultExecutorExtension();
    private TestingRpcService rpcService;

    RetryingRegistrationTest() {
    }

    @BeforeEach
    void setup() {
        this.rpcService = new TestingRpcService();
    }

    @AfterEach
    void tearDown() throws ExecutionException, InterruptedException {
        if (this.rpcService != null) {
            this.rpcService.closeAsync().get();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testSimpleSuccessfulRegistration() throws Exception {
        String testId = "laissez les bon temps roulez";
        String testEndpointAddress = "<test-address>";
        UUID leaderId = UUID.randomUUID();
        ManualResponseTestRegistrationGateway testGateway = new ManualResponseTestRegistrationGateway(new RegistrationResponse[]{new TestRegistrationSuccess("laissez les bon temps roulez")});
        try {
            this.rpcService.registerGateway("<test-address>", testGateway);
            TestRetryingRegistration registration = new TestRetryingRegistration(this.rpcService, "<test-address>", leaderId);
            registration.startRegistration();
            CompletableFuture future = registration.getFuture();
            Assertions.assertThat((CompletableFuture)future).isNotNull();
            Assertions.assertThat((CompletableFuture)registration.getFuture()).isEqualTo((Object)future);
            RetryingRegistration.RetryingRegistrationResult registrationResponse = (RetryingRegistration.RetryingRegistrationResult)future.get(10L, TimeUnit.SECONDS);
            Assertions.assertThat((String)((TestRegistrationSuccess)((Object)registrationResponse.getSuccess())).getCorrelationId()).isEqualTo("laissez les bon temps roulez");
            Assertions.assertThat((Comparable)testGateway.getInvocations().take().leaderId()).isEqualTo((Object)leaderId);
        }
        finally {
            testGateway.stop();
        }
    }

    @Test
    void testPropagateFailures() throws Exception {
        String testExceptionMessage = "testExceptionMessage";
        RpcService rpc = (RpcService)Mockito.mock(RpcService.class);
        Mockito.when((Object)rpc.connect(Mockito.anyString(), (Class)Mockito.any(Class.class))).thenThrow(new Throwable[]{new RuntimeException("testExceptionMessage")});
        TestRetryingRegistration registration = new TestRetryingRegistration(rpc, "testaddress", UUID.randomUUID());
        registration.startRegistration();
        CompletableFuture future = registration.getFuture();
        Assertions.assertThat((CompletableFuture)future).isDone();
        ((AbstractThrowableAssert)((AbstractThrowableAssert)Assertions.assertThatThrownBy(future::get).withFailMessage("We expected an ExecutionException.", new Object[0])).isInstanceOf(ExecutionException.class)).cause().hasMessage("testExceptionMessage");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testRetryConnectOnFailure() throws Exception {
        String testId = "laissez les bon temps roulez";
        UUID leaderId = UUID.randomUUID();
        ScheduledExecutorServiceAdapter executor = new ScheduledExecutorServiceAdapter((ScheduledExecutorService)EXECUTOR_EXTENSION.getExecutor());
        ManualResponseTestRegistrationGateway testGateway = new ManualResponseTestRegistrationGateway(new RegistrationResponse[]{new TestRegistrationSuccess("laissez les bon temps roulez")});
        try {
            RpcService rpc = (RpcService)Mockito.mock(RpcService.class);
            Mockito.when((Object)rpc.connect(Mockito.anyString(), (Class)Mockito.any(Class.class))).thenReturn((Object)FutureUtils.completedExceptionally((Throwable)new Exception("test connect failure")), (Object[])new CompletableFuture[]{CompletableFuture.completedFuture(testGateway)});
            Mockito.when((Object)rpc.getScheduledExecutor()).thenReturn((Object)executor);
            TestRetryingRegistration registration = new TestRetryingRegistration(rpc, "foobar address", leaderId);
            long start = System.currentTimeMillis();
            registration.startRegistration();
            RetryingRegistration.RetryingRegistrationResult registrationResponse = (RetryingRegistration.RetryingRegistrationResult)registration.getFuture().get(10L, TimeUnit.SECONDS);
            long duration = System.currentTimeMillis() - start;
            ((AbstractLongAssert)Assertions.assertThat((long)duration).withFailMessage("The registration should have failed the first time. Thus the duration should be longer than at least a single error delay.", new Object[0])).isGreaterThan(200L);
            Assertions.assertThat((String)((TestRegistrationSuccess)((Object)registrationResponse.getSuccess())).getCorrelationId()).isEqualTo("laissez les bon temps roulez");
            Assertions.assertThat((Comparable)testGateway.getInvocations().take().leaderId()).isEqualTo((Object)leaderId);
        }
        finally {
            testGateway.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Timeout(value=10000L)
    void testRetriesOnTimeouts() throws Exception {
        String testId = "rien ne va plus";
        String testEndpointAddress = "<test-address>";
        UUID leaderId = UUID.randomUUID();
        ManualResponseTestRegistrationGateway testGateway = new ManualResponseTestRegistrationGateway(new RegistrationResponse[]{null, null, new TestRegistrationSuccess("rien ne va plus")});
        try {
            this.rpcService.registerGateway("<test-address>", testGateway);
            long initialTimeout = 20L;
            TestRetryingRegistration registration = new TestRetryingRegistration(this.rpcService, "<test-address>", leaderId, new RetryingRegistrationConfiguration(20L, 1000L, 15000L, 15000L));
            long started = System.nanoTime();
            registration.startRegistration();
            CompletableFuture future = registration.getFuture();
            RetryingRegistration.RetryingRegistrationResult registrationResponse = (RetryingRegistration.RetryingRegistrationResult)future.get(10L, TimeUnit.SECONDS);
            long finished = System.nanoTime();
            long elapsedMillis = (finished - started) / 1000000L;
            Assertions.assertThat((String)((TestRegistrationSuccess)((Object)registrationResponse.getSuccess())).getCorrelationId()).isEqualTo("rien ne va plus");
            Assertions.assertThat((Comparable)testGateway.getInvocations().take().leaderId()).isEqualTo((Object)leaderId);
            ((AbstractLongAssert)Assertions.assertThat((long)elapsedMillis).withFailMessage("retries did not properly back off", new Object[0])).isGreaterThanOrEqualTo(60L);
        }
        finally {
            testGateway.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testFailure() throws Exception {
        String testId = "qui a coupe le fromage";
        String testEndpointAddress = "<test-address>";
        UUID leaderId = UUID.randomUUID();
        ManualResponseTestRegistrationGateway testGateway = new ManualResponseTestRegistrationGateway(new RegistrationResponse[]{null, new RegistrationResponse.Failure((Throwable)new FlinkException("no reason")), null, new TestRegistrationSuccess("qui a coupe le fromage")});
        try {
            this.rpcService.registerGateway("<test-address>", testGateway);
            TestRetryingRegistration registration = new TestRetryingRegistration(this.rpcService, "<test-address>", leaderId);
            long started = System.nanoTime();
            registration.startRegistration();
            CompletableFuture future = registration.getFuture();
            RetryingRegistration.RetryingRegistrationResult registrationResponse = (RetryingRegistration.RetryingRegistrationResult)future.get(10L, TimeUnit.SECONDS);
            long finished = System.nanoTime();
            long elapsedMillis = (finished - started) / 1000000L;
            Assertions.assertThat((String)((TestRegistrationSuccess)((Object)registrationResponse.getSuccess())).getCorrelationId()).isEqualTo("qui a coupe le fromage");
            Assertions.assertThat((Comparable)testGateway.getInvocations().take().leaderId()).isEqualTo((Object)leaderId);
            ((AbstractLongAssert)Assertions.assertThat((long)elapsedMillis).withFailMessage("retries did not properly back off", new Object[0])).isGreaterThanOrEqualTo(240L);
        }
        finally {
            testGateway.stop();
        }
    }

    @Test
    void testRegistrationRejection() {
        ManualResponseTestRegistrationGateway testRegistrationGateway = new ManualResponseTestRegistrationGateway(new RegistrationResponse[]{new TestRegistrationRejection(TestRegistrationRejection.RejectionReason.REJECTED)});
        this.rpcService.registerGateway(testRegistrationGateway.getAddress(), testRegistrationGateway);
        TestRetryingRegistration testRetryingRegistration = new TestRetryingRegistration(this.rpcService, testRegistrationGateway.getAddress(), UUID.randomUUID());
        testRetryingRegistration.startRegistration();
        RetryingRegistration.RetryingRegistrationResult response = (RetryingRegistration.RetryingRegistrationResult)testRetryingRegistration.getFuture().join();
        Assertions.assertThat((boolean)response.isRejection()).isTrue();
        Assertions.assertThat((Comparable)((Object)((TestRegistrationRejection)((Object)response.getRejection())).getRejectionReason())).isEqualTo((Object)TestRegistrationRejection.RejectionReason.REJECTED);
    }

    @Test
    void testRetryOnError() throws Exception {
        String testId = "Petit a petit, l'oiseau fait son nid";
        String testEndpointAddress = "<test-address>";
        UUID leaderId = UUID.randomUUID();
        ArrayDeque<CompletableFuture<TestRegistrationSuccess>> responses = new ArrayDeque<CompletableFuture<TestRegistrationSuccess>>(2);
        responses.add(FutureUtils.completedExceptionally((Throwable)new Exception("test exception")));
        responses.add(CompletableFuture.completedFuture(new TestRegistrationSuccess("Petit a petit, l'oiseau fait son nid")));
        DefaultTestRegistrationGateway testGateway = DefaultTestRegistrationGateway.newBuilder().setRegistrationFunction((uuid, aLong) -> (CompletableFuture)responses.poll()).build();
        this.rpcService.registerGateway("<test-address>", testGateway);
        TestRetryingRegistration registration = new TestRetryingRegistration(this.rpcService, "<test-address>", leaderId);
        long started = System.nanoTime();
        registration.startRegistration();
        CompletableFuture future = registration.getFuture();
        RetryingRegistration.RetryingRegistrationResult registrationResponse = (RetryingRegistration.RetryingRegistrationResult)future.get(10L, TimeUnit.SECONDS);
        long finished = System.nanoTime();
        long elapsedMillis = (finished - started) / 1000000L;
        Assertions.assertThat((String)((TestRegistrationSuccess)((Object)registrationResponse.getSuccess())).getCorrelationId()).isEqualTo("Petit a petit, l'oiseau fait son nid");
        ((AbstractLongAssert)Assertions.assertThat((long)elapsedMillis).withFailMessage("retries did not properly back off", new Object[0])).isGreaterThanOrEqualTo(200L);
    }

    @Test
    void testCancellation() throws Exception {
        String testEndpointAddress = "my-test-address";
        UUID leaderId = UUID.randomUUID();
        CompletableFuture result = new CompletableFuture();
        AtomicInteger registrationCallCounter = new AtomicInteger(0);
        DefaultTestRegistrationGateway testGateway = DefaultTestRegistrationGateway.newBuilder().setRegistrationFunction((uuid, aLong) -> {
            registrationCallCounter.incrementAndGet();
            return result;
        }).build();
        this.rpcService.registerGateway("my-test-address", testGateway);
        TestRetryingRegistration registration = new TestRetryingRegistration(this.rpcService, "my-test-address", leaderId);
        registration.startRegistration();
        registration.cancel();
        result.completeExceptionally(new TimeoutException());
        Assertions.assertThat((AtomicInteger)registrationCallCounter).hasValueLessThanOrEqualTo(1);
    }

    static class TestRetryingRegistration
    extends RetryingRegistration<UUID, TestRegistrationGateway, TestRegistrationSuccess, TestRegistrationRejection> {
        static final long INITIAL_TIMEOUT = 20L;
        static final long MAX_TIMEOUT = 200L;
        static final long DELAY_ON_ERROR = 200L;
        static final long DELAY_ON_FAILURE = 200L;
        static final RetryingRegistrationConfiguration RETRYING_REGISTRATION_CONFIGURATION = new RetryingRegistrationConfiguration(20L, 200L, 200L, 200L);

        public TestRetryingRegistration(RpcService rpc, String targetAddress, UUID leaderId) {
            this(rpc, targetAddress, leaderId, RETRYING_REGISTRATION_CONFIGURATION);
        }

        public TestRetryingRegistration(RpcService rpc, String targetAddress, UUID leaderId, RetryingRegistrationConfiguration retryingRegistrationConfiguration) {
            super(LoggerFactory.getLogger(RetryingRegistrationTest.class), rpc, "TestEndpoint", TestRegistrationGateway.class, targetAddress, (Serializable)leaderId, retryingRegistrationConfiguration);
        }

        protected CompletableFuture<RegistrationResponse> invokeRegistration(TestRegistrationGateway gateway, UUID leaderId, long timeoutMillis) {
            return gateway.registrationCall(leaderId, timeoutMillis);
        }
    }

    static class TestRegistrationRejection
    extends RegistrationResponse.Rejection {
        private final RejectionReason rejectionReason;

        public TestRegistrationRejection(RejectionReason rejectionReason) {
            this.rejectionReason = rejectionReason;
        }

        public RejectionReason getRejectionReason() {
            return this.rejectionReason;
        }

        static enum RejectionReason {
            REJECTED;

        }
    }

    static class TestRegistrationSuccess
    extends RegistrationResponse.Success {
        private static final long serialVersionUID = 5542698790917150604L;
        private final String correlationId;

        public TestRegistrationSuccess(String correlationId) {
            this.correlationId = correlationId;
        }

        public String getCorrelationId() {
            return this.correlationId;
        }
    }
}

