package org.apache.flink.runtime.rpc;

import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.util.TestLoggerExtension;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith({TestLoggerExtension.class})
/* loaded from: input_file:org/apache/flink/runtime/rpc/RpcEndpointTest.class */
public class RpcEndpointTest {
    private static RpcService rpcService = null;

    /* loaded from: input_file:org/apache/flink/runtime/rpc/RpcEndpointTest$BaseEndpoint.class */
    public static class BaseEndpoint extends RpcEndpoint implements BaseGateway {
        private final int foobarValue;

        protected BaseEndpoint(RpcService rpcService) {
            super(rpcService);
            this.foobarValue = Integer.MAX_VALUE;
        }

        protected BaseEndpoint(RpcService rpcService, int i) {
            super(rpcService);
            this.foobarValue = i;
        }

        @Override // org.apache.flink.runtime.rpc.RpcEndpointTest.BaseGateway
        public CompletableFuture<Integer> foobar() {
            return CompletableFuture.completedFuture(Integer.valueOf(this.foobarValue));
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/RpcEndpointTest$BaseGateway.class */
    public interface BaseGateway extends RpcGateway {
        CompletableFuture<Integer> foobar();
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/RpcEndpointTest$DifferentGateway.class */
    public interface DifferentGateway extends RpcGateway {
        CompletableFuture<String> foo();
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/RpcEndpointTest$ExtendedEndpoint.class */
    public static class ExtendedEndpoint extends BaseEndpoint implements ExtendedGateway, DifferentGateway {
        private final int barfooValue;
        private final String fooString;

        protected ExtendedEndpoint(RpcService rpcService, int i, int i2, String str) {
            super(rpcService, i);
            this.barfooValue = i2;
            this.fooString = str;
        }

        @Override // org.apache.flink.runtime.rpc.RpcEndpointTest.ExtendedGateway
        public CompletableFuture<Integer> barfoo() {
            return CompletableFuture.completedFuture(Integer.valueOf(this.barfooValue));
        }

        @Override // org.apache.flink.runtime.rpc.RpcEndpointTest.DifferentGateway
        public CompletableFuture<String> foo() {
            return CompletableFuture.completedFuture(this.fooString);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/RpcEndpointTest$ExtendedGateway.class */
    public interface ExtendedGateway extends BaseGateway {
        CompletableFuture<Integer> barfoo();
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/RpcEndpointTest$RunningStateTestingEndpoint.class */
    private static final class RunningStateTestingEndpoint extends RpcEndpoint implements RunningStateTestingEndpointGateway {
        private final CountDownLatch onStopCalled;
        private final CompletableFuture<Void> stopFuture;

        RunningStateTestingEndpoint(RpcService rpcService, CompletableFuture<Void> completableFuture) {
            super(rpcService);
            this.stopFuture = completableFuture;
            this.onStopCalled = new CountDownLatch(1);
        }

        public CompletableFuture<Void> onStop() {
            this.onStopCalled.countDown();
            return this.stopFuture;
        }

        CompletableFuture<Void> closeAndWaitUntilOnStopCalled() throws InterruptedException {
            CompletableFuture<Void> closeAsync = closeAsync();
            this.onStopCalled.await();
            return closeAsync;
        }

        @Override // org.apache.flink.runtime.rpc.RpcEndpointTest.RunningStateTestingEndpointGateway
        public CompletableFuture<Boolean> queryIsRunningFlag() {
            return CompletableFuture.completedFuture(Boolean.valueOf(isRunning()));
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/RpcEndpointTest$RunningStateTestingEndpointGateway.class */
    public interface RunningStateTestingEndpointGateway extends RpcGateway {
        CompletableFuture<Boolean> queryIsRunningFlag();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/rpc/RpcEndpointTest$TestMainThreadExecutable.class */
    public static class TestMainThreadExecutable implements MainThreadExecutable {
        private final Consumer<Runnable> scheduleRunAsyncConsumer;

        private TestMainThreadExecutable(Consumer<Runnable> consumer) {
            this.scheduleRunAsyncConsumer = consumer;
        }

        public void runAsync(Runnable runnable) {
            this.scheduleRunAsyncConsumer.accept(runnable);
        }

        public <V> CompletableFuture<V> callAsync(Callable<V> callable, Duration duration) {
            throw new UnsupportedOperationException();
        }

        public void scheduleRunAsync(Runnable runnable, long j) {
            this.scheduleRunAsyncConsumer.accept(runnable);
        }
    }

    @BeforeAll
    public static void setup() throws Exception {
        rpcService = RpcSystem.load().localServiceBuilder(new Configuration()).createAndStart();
    }

    @AfterAll
    public static void teardown() throws Exception {
        rpcService.closeAsync().get();
    }

    @Test
    public void testSelfGateway() throws Exception {
        BaseEndpoint baseEndpoint = new BaseEndpoint(rpcService, 1337);
        try {
            baseEndpoint.start();
            Assertions.assertEquals(1337, ((BaseGateway) baseEndpoint.getSelfGateway(BaseGateway.class)).foobar().get());
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{baseEndpoint});
            baseEndpoint.validateResourceClosed();
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{baseEndpoint});
            baseEndpoint.validateResourceClosed();
            throw th;
        }
    }

    @Test
    public void testWrongSelfGateway() {
        Assertions.assertThrows(RuntimeException.class, () -> {
            BaseEndpoint baseEndpoint = new BaseEndpoint(rpcService, 1337);
            try {
                baseEndpoint.start();
                Assertions.fail("Expected to fail with a RuntimeException since we requested the wrong gateway type.");
                RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{baseEndpoint});
                baseEndpoint.validateResourceClosed();
            } catch (Throwable th) {
                RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{baseEndpoint});
                baseEndpoint.validateResourceClosed();
                throw th;
            }
        });
    }

    @Test
    public void testEndpointInheritance() throws Exception {
        ExtendedEndpoint extendedEndpoint = new ExtendedEndpoint(rpcService, 1, 2, "foobar");
        try {
            extendedEndpoint.start();
            BaseGateway baseGateway = (BaseGateway) extendedEndpoint.getSelfGateway(BaseGateway.class);
            ExtendedGateway extendedGateway = (ExtendedGateway) extendedEndpoint.getSelfGateway(ExtendedGateway.class);
            DifferentGateway differentGateway = (DifferentGateway) extendedEndpoint.getSelfGateway(DifferentGateway.class);
            Assertions.assertEquals(1, baseGateway.foobar().get());
            Assertions.assertEquals(1, extendedGateway.foobar().get());
            Assertions.assertEquals(2, extendedGateway.barfoo().get());
            Assertions.assertEquals("foobar", differentGateway.foo().get());
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{extendedEndpoint});
            extendedEndpoint.validateResourceClosed();
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{extendedEndpoint});
            extendedEndpoint.validateResourceClosed();
            throw th;
        }
    }

    @Test
    public void testRunningState() throws InterruptedException, ExecutionException, TimeoutException {
        RunningStateTestingEndpoint runningStateTestingEndpoint = new RunningStateTestingEndpoint(rpcService, CompletableFuture.completedFuture(null));
        RunningStateTestingEndpointGateway runningStateTestingEndpointGateway = (RunningStateTestingEndpointGateway) runningStateTestingEndpoint.getSelfGateway(RunningStateTestingEndpointGateway.class);
        try {
            runningStateTestingEndpoint.start();
            Assertions.assertTrue(runningStateTestingEndpointGateway.queryIsRunningFlag().get().booleanValue());
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{runningStateTestingEndpoint});
            runningStateTestingEndpoint.validateResourceClosed();
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{runningStateTestingEndpoint});
            runningStateTestingEndpoint.validateResourceClosed();
            throw th;
        }
    }

    @Test
    public void testNotRunningState() throws InterruptedException, ExecutionException, TimeoutException {
        CompletableFuture completableFuture = new CompletableFuture();
        RunningStateTestingEndpoint runningStateTestingEndpoint = new RunningStateTestingEndpoint(rpcService, completableFuture);
        RunningStateTestingEndpointGateway runningStateTestingEndpointGateway = (RunningStateTestingEndpointGateway) runningStateTestingEndpoint.getSelfGateway(RunningStateTestingEndpointGateway.class);
        runningStateTestingEndpoint.start();
        CompletableFuture<Void> closeAndWaitUntilOnStopCalled = runningStateTestingEndpoint.closeAndWaitUntilOnStopCalled();
        Assertions.assertFalse(runningStateTestingEndpointGateway.queryIsRunningFlag().get().booleanValue());
        completableFuture.complete(null);
        closeAndWaitUntilOnStopCalled.get();
        runningStateTestingEndpoint.validateResourceClosed();
    }

    @Test
    public void testExecute() throws InterruptedException, ExecutionException, TimeoutException {
        BaseEndpoint baseEndpoint = new BaseEndpoint(rpcService);
        CompletableFuture completableFuture = new CompletableFuture();
        try {
            baseEndpoint.start();
            baseEndpoint.getMainThreadExecutor().execute(() -> {
                baseEndpoint.validateRunsInMainThread();
                completableFuture.complete(null);
            });
            completableFuture.get();
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{baseEndpoint});
            baseEndpoint.validateResourceClosed();
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{baseEndpoint});
            baseEndpoint.validateResourceClosed();
            throw th;
        }
    }

    @Test
    public void testScheduleRunnableWithDelayInMilliseconds() throws Exception {
        testScheduleWithDelay((mainThreadExecutor, duration) -> {
            mainThreadExecutor.schedule(() -> {
            }, duration.toMillis(), TimeUnit.MILLISECONDS);
        });
    }

    @Test
    public void testScheduleRunnableWithDelayInSeconds() throws Exception {
        testScheduleWithDelay((mainThreadExecutor, duration) -> {
            mainThreadExecutor.schedule(() -> {
            }, duration.toMillis() / 1000, TimeUnit.SECONDS);
        });
    }

    @Test
    public void testScheduleRunnableAfterClose() throws Exception {
        testScheduleAfterClose((mainThreadExecutor, duration) -> {
            return mainThreadExecutor.schedule(() -> {
            }, duration.toMillis() / 1000, TimeUnit.SECONDS);
        });
    }

    @Test
    public void testCancelScheduledRunnable() throws Exception {
        testCancelScheduledTask((mainThreadExecutor, completableFuture) -> {
            return mainThreadExecutor.schedule(() -> {
                completableFuture.complete(null);
            }, Duration.ofMillis(2L).toMillis(), TimeUnit.MILLISECONDS);
        });
    }

    @Test
    public void testScheduleCallableWithDelayInMilliseconds() throws Exception {
        testScheduleWithDelay((mainThreadExecutor, duration) -> {
            mainThreadExecutor.schedule(() -> {
                return 1;
            }, duration.toMillis(), TimeUnit.MILLISECONDS);
        });
    }

    @Test
    public void testScheduleCallableWithDelayInSeconds() throws Exception {
        testScheduleWithDelay((mainThreadExecutor, duration) -> {
            mainThreadExecutor.schedule(() -> {
                return 1;
            }, duration.toMillis() / 1000, TimeUnit.SECONDS);
        });
    }

    @Test
    public void testScheduleCallableAfterClose() throws Exception {
        testScheduleAfterClose((mainThreadExecutor, duration) -> {
            return mainThreadExecutor.schedule(() -> {
                return 1;
            }, duration.toMillis() / 1000, TimeUnit.SECONDS);
        });
    }

    @Test
    public void testCancelScheduledCallable() {
        testCancelScheduledTask((mainThreadExecutor, completableFuture) -> {
            return mainThreadExecutor.schedule(() -> {
                completableFuture.complete(null);
                return null;
            }, Duration.ofMillis(2L).toMillis(), TimeUnit.MILLISECONDS);
        });
    }

    private static void testScheduleWithDelay(BiConsumer<RpcEndpoint.MainThreadExecutor, Duration> biConsumer) throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        RpcEndpoint.MainThreadExecutor mainThreadExecutor = new RpcEndpoint.MainThreadExecutor(new TestMainThreadExecutable(runnable -> {
            completableFuture.complete(null);
        }), () -> {
        }, "foobar");
        biConsumer.accept(mainThreadExecutor, Duration.ofSeconds(1L));
        completableFuture.get();
        mainThreadExecutor.close();
    }

    private static void testScheduleAfterClose(BiFunction<RpcEndpoint.MainThreadExecutor, Duration, ScheduledFuture<?>> biFunction) {
        CompletableFuture completableFuture = new CompletableFuture();
        RpcEndpoint.MainThreadExecutor mainThreadExecutor = new RpcEndpoint.MainThreadExecutor(new TestMainThreadExecutable(runnable -> {
            completableFuture.complete(null);
        }), () -> {
        }, "foobar");
        mainThreadExecutor.close();
        ScheduledFuture<?> apply = biFunction.apply(mainThreadExecutor, Duration.ofSeconds(0L));
        Assertions.assertFalse(completableFuture.isDone());
        Assertions.assertFalse(apply.isDone());
    }

    private static void testCancelScheduledTask(BiFunction<RpcEndpoint.MainThreadExecutor, CompletableFuture<Void>, ScheduledFuture<?>> biFunction) {
        TestMainThreadExecutable testMainThreadExecutable = new TestMainThreadExecutable((v0) -> {
            v0.run();
        });
        ManuallyTriggeredScheduledExecutorService manuallyTriggeredScheduledExecutorService = new ManuallyTriggeredScheduledExecutorService();
        RpcEndpoint.MainThreadExecutor mainThreadExecutor = new RpcEndpoint.MainThreadExecutor(testMainThreadExecutable, () -> {
        }, manuallyTriggeredScheduledExecutorService);
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        ScheduledFuture<?> apply = biFunction.apply(mainThreadExecutor, completableFuture);
        apply.cancel(true);
        manuallyTriggeredScheduledExecutorService.triggerAllNonPeriodicTasks();
        Assertions.assertTrue(apply.isCancelled());
        Assertions.assertFalse(completableFuture.isDone());
        mainThreadExecutor.close();
    }

    @Test
    public void testCallAsync() throws InterruptedException, ExecutionException, TimeoutException {
        BaseEndpoint baseEndpoint = new BaseEndpoint(rpcService);
        int i = 12345;
        try {
            baseEndpoint.start();
            Assertions.assertEquals(12345, (Integer) baseEndpoint.callAsync(() -> {
                baseEndpoint.validateRunsInMainThread();
                return i;
            }, Duration.ofSeconds(10L)).get());
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{baseEndpoint});
            baseEndpoint.validateResourceClosed();
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{baseEndpoint});
            baseEndpoint.validateResourceClosed();
            throw th;
        }
    }

    @Test
    public void testCallAsyncTimeout() throws InterruptedException, ExecutionException, TimeoutException {
        BaseEndpoint baseEndpoint = new BaseEndpoint(rpcService);
        Duration ofMillis = Duration.ofMillis(100L);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        try {
            baseEndpoint.start();
            Throwable th = (Throwable) baseEndpoint.callAsync(() -> {
                baseEndpoint.validateRunsInMainThread();
                countDownLatch.await();
                return 12345;
            }, ofMillis).handle((num, th2) -> {
                return th2;
            }).get();
            Assertions.assertNotNull(th);
            Assertions.assertTrue(th instanceof TimeoutException);
            countDownLatch.countDown();
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{baseEndpoint});
            baseEndpoint.validateResourceClosed();
        } catch (Throwable th3) {
            countDownLatch.countDown();
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{baseEndpoint});
            baseEndpoint.validateResourceClosed();
            throw th3;
        }
    }
}
