/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc;

import akka.actor.ActorSystem;
import java.io.Serializable;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
import org.apache.flink.runtime.rpc.FencedRpcGateway;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
import scala.concurrent.Future;

public class AsyncCallsTest
extends TestLogger {
    private static final ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
    private static final Time timeout = Time.seconds((long)10L);
    private static final AkkaRpcService akkaRpcService = new AkkaRpcService(actorSystem, AkkaRpcServiceConfiguration.defaultConfiguration());

    @AfterClass
    public static void shutdown() throws InterruptedException, ExecutionException, TimeoutException {
        CompletableFuture rpcTerminationFuture = akkaRpcService.stopService();
        CompletableFuture actorSystemTerminationFuture = FutureUtils.toJava((Future)actorSystem.terminate());
        FutureUtils.waitForAll(Arrays.asList(rpcTerminationFuture, actorSystemTerminationFuture)).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
    }

    @Test
    public void testScheduleWithNoDelay() throws Exception {
        this.runScheduleWithNoDelayTest(TestEndpoint::new);
    }

    @Test
    public void testFencedScheduleWithNoDelay() throws Exception {
        this.runScheduleWithNoDelayTest(FencedTestEndpoint::new);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void runScheduleWithNoDelayTest(RpcEndpointFactory factory) throws Exception {
        ReentrantLock lock = new ReentrantLock();
        AtomicBoolean concurrentAccess = new AtomicBoolean(false);
        RpcEndpoint rpcEndpoint = factory.create((RpcService)akkaRpcService, lock, concurrentAccess);
        rpcEndpoint.start();
        try {
            TestGateway gateway = (TestGateway)rpcEndpoint.getSelfGateway(TestGateway.class);
            gateway.someCall();
            gateway.anotherCall();
            gateway.someCall();
            for (int i = 0; i < 10000; ++i) {
                rpcEndpoint.runAsync(() -> {
                    boolean holdsLock = lock.tryLock();
                    if (holdsLock) {
                        lock.unlock();
                    } else {
                        concurrentAccess.set(true);
                    }
                });
            }
            CompletableFuture result = rpcEndpoint.callAsync(() -> {
                boolean holdsLock = lock.tryLock();
                if (holdsLock) {
                    lock.unlock();
                } else {
                    concurrentAccess.set(true);
                }
                return "test";
            }, Time.seconds((long)30L));
            String str = (String)result.get(30L, TimeUnit.SECONDS);
            Assert.assertEquals((Object)"test", (Object)str);
            Assert.assertFalse((String)"Rpc Endpoint had concurrent access", (boolean)concurrentAccess.get());
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)rpcEndpoint, (Time)timeout);
        }
    }

    @Test
    public void testScheduleWithDelay() throws Exception {
        this.runScheduleWithDelayTest(TestEndpoint::new);
    }

    @Test
    public void testFencedScheduleWithDelay() throws Exception {
        this.runScheduleWithDelayTest(FencedTestEndpoint::new);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void runScheduleWithDelayTest(RpcEndpointFactory factory) throws Exception {
        ReentrantLock lock = new ReentrantLock();
        AtomicBoolean concurrentAccess = new AtomicBoolean(false);
        OneShotLatch latch = new OneShotLatch();
        long delay = 10L;
        RpcEndpoint rpcEndpoint = factory.create((RpcService)akkaRpcService, lock, concurrentAccess);
        rpcEndpoint.start();
        try {
            rpcEndpoint.runAsync(() -> {
                boolean holdsLock = lock.tryLock();
                if (holdsLock) {
                    lock.unlock();
                } else {
                    concurrentAccess.set(true);
                }
            });
            long start = System.nanoTime();
            rpcEndpoint.scheduleRunAsync(() -> {
                boolean holdsLock = lock.tryLock();
                if (holdsLock) {
                    lock.unlock();
                } else {
                    concurrentAccess.set(true);
                }
                latch.trigger();
            }, 10L, TimeUnit.MILLISECONDS);
            latch.await();
            long stop = System.nanoTime();
            Assert.assertFalse((String)"Rpc Endpoint had concurrent access", (boolean)concurrentAccess.get());
            Assert.assertTrue((String)"call was not properly delayed", ((stop - start) / 1000000L >= 10L ? 1 : 0) != 0);
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)rpcEndpoint, (Time)timeout);
        }
    }

    @Test
    public void testRunAsyncWithFencing() throws Exception {
        Time shortTimeout = Time.milliseconds((long)100L);
        UUID newFencingToken = UUID.randomUUID();
        CompletableFuture resultFuture = new CompletableFuture();
        AsyncCallsTest.testRunAsync(endpoint -> {
            endpoint.runAsync(() -> resultFuture.complete(endpoint.getFencingToken()));
            return resultFuture;
        }, newFencingToken);
        try {
            resultFuture.get(shortTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            Assert.fail((String)"The async run operation should not complete since it is filtered out due to the changed fencing token.");
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
    }

    @Test
    public void testRunAsyncWithoutFencing() throws Exception {
        CompletableFuture resultFuture = new CompletableFuture();
        UUID newFencingToken = UUID.randomUUID();
        AsyncCallsTest.testRunAsync(endpoint -> {
            endpoint.runAsyncWithoutFencing(() -> resultFuture.complete(endpoint.getFencingToken()));
            return resultFuture;
        }, newFencingToken);
        Assert.assertEquals((Object)newFencingToken, resultFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS));
    }

    @Test
    public void testCallAsyncWithFencing() throws Exception {
        UUID newFencingToken = UUID.randomUUID();
        CompletableFuture resultFuture = AsyncCallsTest.testRunAsync(endpoint -> endpoint.callAsync(() -> true, timeout), newFencingToken);
        try {
            resultFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            Assert.fail((String)"The async call operation should fail due to the changed fencing token.");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(ExceptionUtils.stripExecutionException((Throwable)e) instanceof FencingTokenException));
        }
    }

    @Test
    public void testCallAsyncWithoutFencing() throws Exception {
        UUID newFencingToken = UUID.randomUUID();
        CompletableFuture resultFuture = AsyncCallsTest.testRunAsync(endpoint -> endpoint.callAsyncWithoutFencing(() -> true, timeout), newFencingToken);
        Assert.assertTrue((boolean)((Boolean)resultFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS)));
    }

    @Test
    public void testUnfencedMainThreadExecutor() throws Exception {
        UUID newFencingToken = UUID.randomUUID();
        boolean value = true;
        CompletableFuture resultFuture = AsyncCallsTest.testRunAsync(endpoint -> CompletableFuture.supplyAsync(() -> true, endpoint.getUnfencedMainThreadExecutor()), newFencingToken);
        Assert.assertThat(resultFuture.get(), (Matcher)Matchers.is((Object)true));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static <T> CompletableFuture<T> testRunAsync(Function<FencedTestEndpoint, CompletableFuture<T>> runAsyncCall, UUID newFencingToken) throws Exception {
        UUID initialFencingToken = UUID.randomUUID();
        OneShotLatch enterSetNewFencingToken = new OneShotLatch();
        OneShotLatch triggerSetNewFencingToken = new OneShotLatch();
        FencedTestEndpoint fencedTestEndpoint = new FencedTestEndpoint((RpcService)akkaRpcService, initialFencingToken, enterSetNewFencingToken, triggerSetNewFencingToken);
        FencedTestGateway fencedTestGateway = (FencedTestGateway)fencedTestEndpoint.getSelfGateway(FencedTestGateway.class);
        try {
            fencedTestEndpoint.start();
            CompletableFuture<Acknowledge> newFencingTokenFuture = fencedTestGateway.setNewFencingToken(newFencingToken, timeout);
            Assert.assertFalse((boolean)newFencingTokenFuture.isDone());
            Assert.assertEquals((Object)initialFencingToken, (Object)fencedTestEndpoint.getFencingToken());
            CompletableFuture<T> result = runAsyncCall.apply(fencedTestEndpoint);
            enterSetNewFencingToken.await();
            triggerSetNewFencingToken.trigger();
            newFencingTokenFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            CompletableFuture<T> completableFuture = result;
            return completableFuture;
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)fencedTestEndpoint, (Time)timeout);
        }
    }

    public static class FencedTestEndpoint
    extends FencedRpcEndpoint<UUID>
    implements FencedTestGateway {
        private final ReentrantLock lock;
        private final AtomicBoolean concurrentAccess;
        private final OneShotLatch enteringSetNewFencingToken;
        private final OneShotLatch triggerSetNewFencingToken;

        protected FencedTestEndpoint(RpcService rpcService, ReentrantLock lock, AtomicBoolean concurrentAccess) {
            this(rpcService, lock, concurrentAccess, UUID.randomUUID(), new OneShotLatch(), new OneShotLatch());
        }

        protected FencedTestEndpoint(RpcService rpcService, UUID initialFencingToken, OneShotLatch enteringSetNewFencingToken, OneShotLatch triggerSetNewFencingToken) {
            this(rpcService, new ReentrantLock(), new AtomicBoolean(false), initialFencingToken, enteringSetNewFencingToken, triggerSetNewFencingToken);
        }

        private FencedTestEndpoint(RpcService rpcService, ReentrantLock lock, AtomicBoolean concurrentAccess, UUID initialFencingToken, OneShotLatch enteringSetNewFencingToken, OneShotLatch triggerSetNewFencingToken) {
            super(rpcService, (Serializable)initialFencingToken);
            this.lock = lock;
            this.concurrentAccess = concurrentAccess;
            this.enteringSetNewFencingToken = enteringSetNewFencingToken;
            this.triggerSetNewFencingToken = triggerSetNewFencingToken;
        }

        @Override
        public CompletableFuture<Acknowledge> setNewFencingToken(UUID fencingToken, Time timeout) {
            this.enteringSetNewFencingToken.trigger();
            try {
                this.triggerSetNewFencingToken.await();
            }
            catch (InterruptedException e) {
                throw new RuntimeException("TriggerSetNewFencingToken OneShotLatch was interrupted.");
            }
            this.setFencingToken(fencingToken);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }

        @Override
        public void someCall() {
            boolean holdsLock = this.lock.tryLock();
            if (holdsLock) {
                this.lock.unlock();
            } else {
                this.concurrentAccess.set(true);
            }
        }

        @Override
        public void anotherCall() {
            boolean holdsLock = this.lock.tryLock();
            if (holdsLock) {
                this.lock.unlock();
            } else {
                this.concurrentAccess.set(true);
            }
        }
    }

    public static interface FencedTestGateway
    extends FencedRpcGateway<UUID>,
    TestGateway {
        public CompletableFuture<Acknowledge> setNewFencingToken(UUID var1, @RpcTimeout Time var2);
    }

    private static class TestEndpoint
    extends RpcEndpoint
    implements TestGateway {
        private final ReentrantLock lock;
        private final AtomicBoolean concurrentAccess;

        TestEndpoint(RpcService rpcService, ReentrantLock lock, AtomicBoolean concurrentAccess) {
            super(rpcService);
            this.lock = lock;
            this.concurrentAccess = concurrentAccess;
        }

        @Override
        public void someCall() {
            boolean holdsLock = this.lock.tryLock();
            if (holdsLock) {
                this.lock.unlock();
            } else {
                this.concurrentAccess.set(true);
            }
        }

        @Override
        public void anotherCall() {
            boolean holdsLock = this.lock.tryLock();
            if (holdsLock) {
                this.lock.unlock();
            } else {
                this.concurrentAccess.set(true);
            }
        }
    }

    public static interface TestGateway
    extends RpcGateway {
        public void someCall();

        public void anotherCall();
    }

    @FunctionalInterface
    private static interface RpcEndpointFactory {
        public RpcEndpoint create(RpcService var1, ReentrantLock var2, AtomicBoolean var3);
    }
}

