/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.registration;

import java.io.Serializable;
import java.util.ArrayDeque;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.runtime.registration.DefaultTestRegistrationGateway;
import org.apache.flink.runtime.registration.ManualResponseTestRegistrationGateway;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.registration.RetryingRegistration;
import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration;
import org.apache.flink.runtime.registration.TestRegistrationGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.LoggerFactory;

public class RetryingRegistrationTest
extends TestLogger {
    @ClassRule
    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorResource();
    private TestingRpcService rpcService;

    @Before
    public void setup() {
        this.rpcService = new TestingRpcService();
    }

    @After
    public void tearDown() throws ExecutionException, InterruptedException {
        if (this.rpcService != null) {
            this.rpcService.stopService().get();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSimpleSuccessfulRegistration() throws Exception {
        String testId = "laissez les bon temps roulez";
        String testEndpointAddress = "<test-address>";
        UUID leaderId = UUID.randomUUID();
        ManualResponseTestRegistrationGateway testGateway = new ManualResponseTestRegistrationGateway(new RegistrationResponse[]{new TestRegistrationSuccess("laissez les bon temps roulez")});
        try {
            this.rpcService.registerGateway("<test-address>", testGateway);
            TestRetryingRegistration registration = new TestRetryingRegistration(this.rpcService, "<test-address>", leaderId);
            registration.startRegistration();
            CompletableFuture future = registration.getFuture();
            Assert.assertNotNull((Object)future);
            Assert.assertEquals((Object)future, (Object)registration.getFuture());
            RetryingRegistration.RetryingRegistrationResult registrationResponse = (RetryingRegistration.RetryingRegistrationResult)future.get(10L, TimeUnit.SECONDS);
            Assert.assertEquals((Object)"laissez les bon temps roulez", (Object)((TestRegistrationSuccess)((Object)registrationResponse.getSuccess())).getCorrelationId());
            Assert.assertEquals((Object)leaderId, (Object)testGateway.getInvocations().take().leaderId());
        }
        finally {
            testGateway.stop();
        }
    }

    @Test
    public void testPropagateFailures() throws Exception {
        String testExceptionMessage = "testExceptionMessage";
        RpcService rpc = (RpcService)Mockito.mock(RpcService.class);
        Mockito.when((Object)rpc.connect(Mockito.anyString(), (Class)Mockito.any(Class.class))).thenThrow(new Throwable[]{new RuntimeException("testExceptionMessage")});
        TestRetryingRegistration registration = new TestRetryingRegistration(rpc, "testaddress", UUID.randomUUID());
        registration.startRegistration();
        CompletableFuture future = registration.getFuture();
        Assert.assertTrue((boolean)future.isDone());
        try {
            future.get();
            Assert.fail((String)"We expected an ExecutionException.");
        }
        catch (ExecutionException e) {
            Assert.assertEquals((Object)"testExceptionMessage", (Object)e.getCause().getMessage());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRetryConnectOnFailure() throws Exception {
        String testId = "laissez les bon temps roulez";
        UUID leaderId = UUID.randomUUID();
        ScheduledExecutorServiceAdapter executor = new ScheduledExecutorServiceAdapter((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ManualResponseTestRegistrationGateway testGateway = new ManualResponseTestRegistrationGateway(new RegistrationResponse[]{new TestRegistrationSuccess("laissez les bon temps roulez")});
        try {
            RpcService rpc = (RpcService)Mockito.mock(RpcService.class);
            Mockito.when((Object)rpc.connect(Mockito.anyString(), (Class)Mockito.any(Class.class))).thenReturn((Object)FutureUtils.completedExceptionally((Throwable)new Exception("test connect failure")), new Object[]{CompletableFuture.completedFuture(testGateway)});
            Mockito.when((Object)rpc.getScheduledExecutor()).thenReturn((Object)executor);
            Mockito.when((Object)rpc.scheduleRunnable((Runnable)Mockito.any(Runnable.class), Mockito.anyLong(), (TimeUnit)((Object)Mockito.any(TimeUnit.class)))).thenAnswer(invocation -> {
                Runnable runnable = (Runnable)invocation.getArgument(0);
                long delay = (Long)invocation.getArgument(1);
                TimeUnit timeUnit = (TimeUnit)((Object)((Object)invocation.getArgument(2)));
                return executor.schedule(runnable, delay, timeUnit);
            });
            TestRetryingRegistration registration = new TestRetryingRegistration(rpc, "foobar address", leaderId);
            long start = System.currentTimeMillis();
            registration.startRegistration();
            RetryingRegistration.RetryingRegistrationResult registrationResponse = (RetryingRegistration.RetryingRegistrationResult)registration.getFuture().get(10L, TimeUnit.SECONDS);
            long duration = System.currentTimeMillis() - start;
            Assert.assertTrue((String)"The registration should have failed the first time. Thus the duration should be longer than at least a single error delay.", (duration > 200L ? 1 : 0) != 0);
            Assert.assertEquals((Object)"laissez les bon temps roulez", (Object)((TestRegistrationSuccess)((Object)registrationResponse.getSuccess())).getCorrelationId());
            Assert.assertEquals((Object)leaderId, (Object)testGateway.getInvocations().take().leaderId());
        }
        finally {
            testGateway.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=10000L)
    public void testRetriesOnTimeouts() throws Exception {
        String testId = "rien ne va plus";
        String testEndpointAddress = "<test-address>";
        UUID leaderId = UUID.randomUUID();
        ManualResponseTestRegistrationGateway testGateway = new ManualResponseTestRegistrationGateway(new RegistrationResponse[]{null, null, new TestRegistrationSuccess("rien ne va plus")});
        try {
            this.rpcService.registerGateway("<test-address>", testGateway);
            long initialTimeout = 20L;
            TestRetryingRegistration registration = new TestRetryingRegistration(this.rpcService, "<test-address>", leaderId, new RetryingRegistrationConfiguration(20L, 1000L, 15000L, 15000L));
            long started = System.nanoTime();
            registration.startRegistration();
            CompletableFuture future = registration.getFuture();
            RetryingRegistration.RetryingRegistrationResult registrationResponse = (RetryingRegistration.RetryingRegistrationResult)future.get(10L, TimeUnit.SECONDS);
            long finished = System.nanoTime();
            long elapsedMillis = (finished - started) / 1000000L;
            Assert.assertEquals((Object)"rien ne va plus", (Object)((TestRegistrationSuccess)((Object)registrationResponse.getSuccess())).getCorrelationId());
            Assert.assertEquals((Object)leaderId, (Object)testGateway.getInvocations().take().leaderId());
            Assert.assertTrue((String)"retries did not properly back off", (elapsedMillis >= 60L ? 1 : 0) != 0);
        }
        finally {
            testGateway.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testFailure() throws Exception {
        String testId = "qui a coupe le fromage";
        String testEndpointAddress = "<test-address>";
        UUID leaderId = UUID.randomUUID();
        ManualResponseTestRegistrationGateway testGateway = new ManualResponseTestRegistrationGateway(new RegistrationResponse[]{null, new RegistrationResponse.Failure((Throwable)new FlinkException("no reason")), null, new TestRegistrationSuccess("qui a coupe le fromage")});
        try {
            this.rpcService.registerGateway("<test-address>", testGateway);
            TestRetryingRegistration registration = new TestRetryingRegistration(this.rpcService, "<test-address>", leaderId);
            long started = System.nanoTime();
            registration.startRegistration();
            CompletableFuture future = registration.getFuture();
            RetryingRegistration.RetryingRegistrationResult registrationResponse = (RetryingRegistration.RetryingRegistrationResult)future.get(10L, TimeUnit.SECONDS);
            long finished = System.nanoTime();
            long elapsedMillis = (finished - started) / 1000000L;
            Assert.assertEquals((Object)"qui a coupe le fromage", (Object)((TestRegistrationSuccess)((Object)registrationResponse.getSuccess())).getCorrelationId());
            Assert.assertEquals((Object)leaderId, (Object)testGateway.getInvocations().take().leaderId());
            Assert.assertTrue((String)"retries did not properly back off", (elapsedMillis >= 240L ? 1 : 0) != 0);
        }
        finally {
            testGateway.stop();
        }
    }

    @Test
    public void testRegistrationRejection() {
        ManualResponseTestRegistrationGateway testRegistrationGateway = new ManualResponseTestRegistrationGateway(new RegistrationResponse[]{new TestRegistrationRejection(TestRegistrationRejection.RejectionReason.REJECTED)});
        this.rpcService.registerGateway(testRegistrationGateway.getAddress(), testRegistrationGateway);
        TestRetryingRegistration testRetryingRegistration = new TestRetryingRegistration(this.rpcService, testRegistrationGateway.getAddress(), UUID.randomUUID());
        testRetryingRegistration.startRegistration();
        RetryingRegistration.RetryingRegistrationResult response = (RetryingRegistration.RetryingRegistrationResult)testRetryingRegistration.getFuture().join();
        Assert.assertTrue((boolean)response.isRejection());
        Assert.assertThat((Object)((Object)((TestRegistrationRejection)((Object)response.getRejection())).getRejectionReason()), (Matcher)Matchers.is((Object)((Object)TestRegistrationRejection.RejectionReason.REJECTED)));
    }

    @Test
    public void testRetryOnError() throws Exception {
        String testId = "Petit a petit, l'oiseau fait son nid";
        String testEndpointAddress = "<test-address>";
        UUID leaderId = UUID.randomUUID();
        ArrayDeque<CompletableFuture<TestRegistrationSuccess>> responses = new ArrayDeque<CompletableFuture<TestRegistrationSuccess>>(2);
        responses.add(FutureUtils.completedExceptionally((Throwable)new Exception("test exception")));
        responses.add(CompletableFuture.completedFuture(new TestRegistrationSuccess("Petit a petit, l'oiseau fait son nid")));
        DefaultTestRegistrationGateway testGateway = DefaultTestRegistrationGateway.newBuilder().setRegistrationFunction((uuid, aLong) -> (CompletableFuture)responses.poll()).build();
        this.rpcService.registerGateway("<test-address>", testGateway);
        TestRetryingRegistration registration = new TestRetryingRegistration(this.rpcService, "<test-address>", leaderId);
        long started = System.nanoTime();
        registration.startRegistration();
        CompletableFuture future = registration.getFuture();
        RetryingRegistration.RetryingRegistrationResult registrationResponse = (RetryingRegistration.RetryingRegistrationResult)future.get(10L, TimeUnit.SECONDS);
        long finished = System.nanoTime();
        long elapsedMillis = (finished - started) / 1000000L;
        Assert.assertEquals((Object)"Petit a petit, l'oiseau fait son nid", (Object)((TestRegistrationSuccess)((Object)registrationResponse.getSuccess())).getCorrelationId());
        Assert.assertTrue((String)"retries did not properly back off", (elapsedMillis >= 200L ? 1 : 0) != 0);
    }

    @Test
    public void testCancellation() throws Exception {
        String testEndpointAddress = "my-test-address";
        UUID leaderId = UUID.randomUUID();
        CompletableFuture result = new CompletableFuture();
        AtomicInteger registrationCallCounter = new AtomicInteger(0);
        DefaultTestRegistrationGateway testGateway = DefaultTestRegistrationGateway.newBuilder().setRegistrationFunction((uuid, aLong) -> {
            registrationCallCounter.incrementAndGet();
            return result;
        }).build();
        this.rpcService.registerGateway("my-test-address", testGateway);
        TestRetryingRegistration registration = new TestRetryingRegistration(this.rpcService, "my-test-address", leaderId);
        registration.startRegistration();
        registration.cancel();
        result.completeExceptionally(new TimeoutException());
        Assert.assertThat((Object)registrationCallCounter.get(), (Matcher)Matchers.is((Matcher)Matchers.lessThanOrEqualTo((Comparable)Integer.valueOf(1))));
    }

    static class TestRetryingRegistration
    extends RetryingRegistration<UUID, TestRegistrationGateway, TestRegistrationSuccess, TestRegistrationRejection> {
        static final long INITIAL_TIMEOUT = 20L;
        static final long MAX_TIMEOUT = 200L;
        static final long DELAY_ON_ERROR = 200L;
        static final long DELAY_ON_FAILURE = 200L;
        static final RetryingRegistrationConfiguration RETRYING_REGISTRATION_CONFIGURATION = new RetryingRegistrationConfiguration(20L, 200L, 200L, 200L);

        public TestRetryingRegistration(RpcService rpc, String targetAddress, UUID leaderId) {
            this(rpc, targetAddress, leaderId, RETRYING_REGISTRATION_CONFIGURATION);
        }

        public TestRetryingRegistration(RpcService rpc, String targetAddress, UUID leaderId, RetryingRegistrationConfiguration retryingRegistrationConfiguration) {
            super(LoggerFactory.getLogger(RetryingRegistrationTest.class), rpc, "TestEndpoint", TestRegistrationGateway.class, targetAddress, (Serializable)leaderId, retryingRegistrationConfiguration);
        }

        protected CompletableFuture<RegistrationResponse> invokeRegistration(TestRegistrationGateway gateway, UUID leaderId, long timeoutMillis) {
            return gateway.registrationCall(leaderId, timeoutMillis);
        }
    }

    static class TestRegistrationRejection
    extends RegistrationResponse.Rejection {
        private final RejectionReason rejectionReason;

        public TestRegistrationRejection(RejectionReason rejectionReason) {
            this.rejectionReason = rejectionReason;
        }

        public RejectionReason getRejectionReason() {
            return this.rejectionReason;
        }

        static enum RejectionReason {
            REJECTED;

        }
    }

    static class TestRegistrationSuccess
    extends RegistrationResponse.Success {
        private static final long serialVersionUID = 5542698790917150604L;
        private final String correlationId;

        public TestRegistrationSuccess(String correlationId) {
            this.correlationId = correlationId;
        }

        public String getCorrelationId() {
            return this.correlationId;
        }
    }
}

