package org.apache.flink.runtime.rpc;

import akka.actor.ActorSystem;
import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/rpc/RpcEndpointTest.class */
public class RpcEndpointTest extends TestLogger {
    private static final Time TIMEOUT = Time.seconds(10);
    private static ActorSystem actorSystem = null;
    private static RpcService rpcService = null;

    /* loaded from: input_file:org/apache/flink/runtime/rpc/RpcEndpointTest$BaseEndpoint.class */
    public static class BaseEndpoint extends RpcEndpoint implements BaseGateway {
        private final int foobarValue;

        protected BaseEndpoint(RpcService rpcService) {
            super(rpcService);
            this.foobarValue = Integer.MAX_VALUE;
        }

        protected BaseEndpoint(RpcService rpcService, int i) {
            super(rpcService);
            this.foobarValue = i;
        }

        @Override // org.apache.flink.runtime.rpc.RpcEndpointTest.BaseGateway
        public CompletableFuture<Integer> foobar() {
            return CompletableFuture.completedFuture(Integer.valueOf(this.foobarValue));
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/RpcEndpointTest$BaseGateway.class */
    public interface BaseGateway extends RpcGateway {
        CompletableFuture<Integer> foobar();
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/RpcEndpointTest$DifferentGateway.class */
    public interface DifferentGateway extends RpcGateway {
        CompletableFuture<String> foo();
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/RpcEndpointTest$ExtendedEndpoint.class */
    public static class ExtendedEndpoint extends BaseEndpoint implements ExtendedGateway, DifferentGateway {
        private final int barfooValue;
        private final String fooString;

        protected ExtendedEndpoint(RpcService rpcService, int i, int i2, String str) {
            super(rpcService, i);
            this.barfooValue = i2;
            this.fooString = str;
        }

        @Override // org.apache.flink.runtime.rpc.RpcEndpointTest.ExtendedGateway
        public CompletableFuture<Integer> barfoo() {
            return CompletableFuture.completedFuture(Integer.valueOf(this.barfooValue));
        }

        @Override // org.apache.flink.runtime.rpc.RpcEndpointTest.DifferentGateway
        public CompletableFuture<String> foo() {
            return CompletableFuture.completedFuture(this.fooString);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/RpcEndpointTest$ExtendedGateway.class */
    public interface ExtendedGateway extends BaseGateway {
        CompletableFuture<Integer> barfoo();
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/RpcEndpointTest$RunningStateTestingEndpoint.class */
    private static final class RunningStateTestingEndpoint extends RpcEndpoint implements RunningStateTestingEndpointGateway {
        private final CountDownLatch onStopCalled;
        private final CompletableFuture<Void> stopFuture;

        RunningStateTestingEndpoint(RpcService rpcService, CompletableFuture<Void> completableFuture) {
            super(rpcService);
            this.stopFuture = completableFuture;
            this.onStopCalled = new CountDownLatch(1);
        }

        public CompletableFuture<Void> onStop() {
            this.onStopCalled.countDown();
            return this.stopFuture;
        }

        CompletableFuture<Void> closeAndWaitUntilOnStopCalled() throws InterruptedException {
            CompletableFuture<Void> closeAsync = closeAsync();
            this.onStopCalled.await();
            return closeAsync;
        }

        @Override // org.apache.flink.runtime.rpc.RpcEndpointTest.RunningStateTestingEndpointGateway
        public CompletableFuture<Boolean> queryIsRunningFlag() {
            return CompletableFuture.completedFuture(Boolean.valueOf(isRunning()));
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/RpcEndpointTest$RunningStateTestingEndpointGateway.class */
    public interface RunningStateTestingEndpointGateway extends RpcGateway {
        CompletableFuture<Boolean> queryIsRunningFlag();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/rpc/RpcEndpointTest$TestMainThreadExecutable.class */
    public static class TestMainThreadExecutable implements MainThreadExecutable {
        private final BiConsumer<Runnable, Long> scheduleRunAsyncConsumer;

        private TestMainThreadExecutable(BiConsumer<Runnable, Long> biConsumer) {
            this.scheduleRunAsyncConsumer = biConsumer;
        }

        public void runAsync(Runnable runnable) {
            throw new UnsupportedOperationException();
        }

        public <V> CompletableFuture<V> callAsync(Callable<V> callable, Time time) {
            throw new UnsupportedOperationException();
        }

        public void scheduleRunAsync(Runnable runnable, long j) {
            this.scheduleRunAsyncConsumer.accept(runnable, Long.valueOf(j));
        }
    }

    @BeforeClass
    public static void setup() {
        actorSystem = AkkaUtils.createDefaultActorSystem();
        rpcService = new AkkaRpcService(actorSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
    }

    @AfterClass
    public static void teardown() throws Exception {
        FutureUtils.waitForAll(Arrays.asList(rpcService.stopService(), FutureUtils.toJava(actorSystem.terminate()))).get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
    }

    @Test
    public void testSelfGateway() throws Exception {
        BaseEndpoint baseEndpoint = new BaseEndpoint(rpcService, 1337);
        try {
            baseEndpoint.start();
            Assert.assertEquals(1337, ((BaseGateway) baseEndpoint.getSelfGateway(BaseGateway.class)).foobar().get());
            RpcUtils.terminateRpcEndpoint(baseEndpoint, TIMEOUT);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(baseEndpoint, TIMEOUT);
            throw th;
        }
    }

    @Test(expected = RuntimeException.class)
    public void testWrongSelfGateway() throws Exception {
        BaseEndpoint baseEndpoint = new BaseEndpoint(rpcService, 1337);
        try {
            baseEndpoint.start();
            Assert.fail("Expected to fail with a RuntimeException since we requested the wrong gateway type.");
            RpcUtils.terminateRpcEndpoint(baseEndpoint, TIMEOUT);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(baseEndpoint, TIMEOUT);
            throw th;
        }
    }

    @Test
    public void testEndpointInheritance() throws Exception {
        ExtendedEndpoint extendedEndpoint = new ExtendedEndpoint(rpcService, 1, 2, "foobar");
        try {
            extendedEndpoint.start();
            BaseGateway baseGateway = (BaseGateway) extendedEndpoint.getSelfGateway(BaseGateway.class);
            ExtendedGateway extendedGateway = (ExtendedGateway) extendedEndpoint.getSelfGateway(ExtendedGateway.class);
            DifferentGateway differentGateway = (DifferentGateway) extendedEndpoint.getSelfGateway(DifferentGateway.class);
            Assert.assertEquals(1, baseGateway.foobar().get());
            Assert.assertEquals(1, extendedGateway.foobar().get());
            Assert.assertEquals(2, extendedGateway.barfoo().get());
            Assert.assertEquals("foobar", differentGateway.foo().get());
            RpcUtils.terminateRpcEndpoint(extendedEndpoint, TIMEOUT);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(extendedEndpoint, TIMEOUT);
            throw th;
        }
    }

    @Test
    public void testRunningState() throws InterruptedException, ExecutionException, TimeoutException {
        RunningStateTestingEndpoint runningStateTestingEndpoint = new RunningStateTestingEndpoint(rpcService, CompletableFuture.completedFuture(null));
        RunningStateTestingEndpointGateway runningStateTestingEndpointGateway = (RunningStateTestingEndpointGateway) runningStateTestingEndpoint.getSelfGateway(RunningStateTestingEndpointGateway.class);
        try {
            runningStateTestingEndpoint.start();
            Assert.assertThat(runningStateTestingEndpointGateway.queryIsRunningFlag().get(), CoreMatchers.is(true));
        } finally {
            RpcUtils.terminateRpcEndpoint(runningStateTestingEndpoint, TIMEOUT);
        }
    }

    @Test
    public void testNotRunningState() throws InterruptedException, ExecutionException, TimeoutException {
        CompletableFuture completableFuture = new CompletableFuture();
        RunningStateTestingEndpoint runningStateTestingEndpoint = new RunningStateTestingEndpoint(rpcService, completableFuture);
        RunningStateTestingEndpointGateway runningStateTestingEndpointGateway = (RunningStateTestingEndpointGateway) runningStateTestingEndpoint.getSelfGateway(RunningStateTestingEndpointGateway.class);
        runningStateTestingEndpoint.start();
        CompletableFuture<Void> closeAndWaitUntilOnStopCalled = runningStateTestingEndpoint.closeAndWaitUntilOnStopCalled();
        Assert.assertThat(runningStateTestingEndpointGateway.queryIsRunningFlag().get(), CoreMatchers.is(false));
        completableFuture.complete(null);
        closeAndWaitUntilOnStopCalled.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
    }

    @Test
    public void testExecute() throws InterruptedException, ExecutionException, TimeoutException {
        BaseEndpoint baseEndpoint = new BaseEndpoint(rpcService);
        CompletableFuture completableFuture = new CompletableFuture();
        try {
            baseEndpoint.start();
            baseEndpoint.getMainThreadExecutor().execute(() -> {
                baseEndpoint.validateRunsInMainThread();
                completableFuture.complete(null);
            });
            completableFuture.get(TIMEOUT.getSize(), TIMEOUT.getUnit());
        } finally {
            RpcUtils.terminateRpcEndpoint(baseEndpoint, TIMEOUT);
        }
    }

    @Test
    public void testScheduleRunnableWithDelayInMilliseconds() throws Exception {
        testScheduleWithDelay((mainThreadExecutor, duration) -> {
            mainThreadExecutor.schedule(() -> {
            }, duration.toMillis(), TimeUnit.MILLISECONDS);
        });
    }

    @Test
    public void testScheduleRunnableWithDelayInSeconds() throws Exception {
        testScheduleWithDelay((mainThreadExecutor, duration) -> {
            mainThreadExecutor.schedule(() -> {
            }, duration.toMillis() / 1000, TimeUnit.SECONDS);
        });
    }

    @Test
    public void testScheduleCallableWithDelayInMilliseconds() throws Exception {
        testScheduleWithDelay((mainThreadExecutor, duration) -> {
            mainThreadExecutor.schedule(() -> {
                return 1;
            }, duration.toMillis(), TimeUnit.MILLISECONDS);
        });
    }

    @Test
    public void testScheduleCallableWithDelayInSeconds() throws Exception {
        testScheduleWithDelay((mainThreadExecutor, duration) -> {
            mainThreadExecutor.schedule(() -> {
                return 1;
            }, duration.toMillis() / 1000, TimeUnit.SECONDS);
        });
    }

    private static void testScheduleWithDelay(BiConsumer<RpcEndpoint.MainThreadExecutor, Duration> biConsumer) throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        RpcEndpoint.MainThreadExecutor mainThreadExecutor = new RpcEndpoint.MainThreadExecutor(new TestMainThreadExecutable((runnable, l) -> {
            completableFuture.complete(l);
        }), () -> {
        });
        Duration ofSeconds = Duration.ofSeconds(1L);
        biConsumer.accept(mainThreadExecutor, ofSeconds);
        Assert.assertThat(completableFuture.get(), CoreMatchers.is(Long.valueOf(ofSeconds.toMillis())));
    }

    @Test
    public void testCallAsync() throws InterruptedException, ExecutionException, TimeoutException {
        BaseEndpoint baseEndpoint = new BaseEndpoint(rpcService);
        int i = 12345;
        try {
            baseEndpoint.start();
            Assert.assertEquals(12345, baseEndpoint.callAsync(() -> {
                baseEndpoint.validateRunsInMainThread();
                return i;
            }, TIMEOUT).get(TIMEOUT.getSize(), TIMEOUT.getUnit()));
            RpcUtils.terminateRpcEndpoint(baseEndpoint, TIMEOUT);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(baseEndpoint, TIMEOUT);
            throw th;
        }
    }

    @Test
    public void testCallAsyncTimeout() throws InterruptedException, ExecutionException, TimeoutException {
        BaseEndpoint baseEndpoint = new BaseEndpoint(rpcService);
        Time milliseconds = Time.milliseconds(100L);
        try {
            baseEndpoint.start();
            Assert.assertTrue(((Throwable) baseEndpoint.callAsync(() -> {
                baseEndpoint.validateRunsInMainThread();
                TimeUnit.MILLISECONDS.sleep(milliseconds.toMilliseconds() * 2);
                return 12345;
            }, milliseconds).handle((num, th) -> {
                return th;
            }).get(milliseconds.getSize() * 2, milliseconds.getUnit())) instanceof TimeoutException);
            RpcUtils.terminateRpcEndpoint(baseEndpoint, TIMEOUT);
        } catch (Throwable th2) {
            RpcUtils.terminateRpcEndpoint(baseEndpoint, TIMEOUT);
            throw th2;
        }
    }
}
