/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster.slotpool;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl;
import org.apache.flink.runtime.jobmaster.slotpool.TestingSlotPoolImpl;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.ThrowingRunnable;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class SlotPoolPendingRequestFailureTest
extends TestLogger {
    private static final JobID jobId = new JobID();
    private static final ComponentMainThreadExecutor mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();
    public static final Time TIMEOUT = Time.seconds((long)10L);
    private TestingResourceManagerGateway resourceManagerGateway;

    @Before
    public void setup() {
        this.resourceManagerGateway = new TestingResourceManagerGateway();
    }

    @Test
    public void testFailingAllocationFailsPendingSlotRequests() throws Exception {
        CompletableFuture allocationIdFuture = new CompletableFuture();
        this.resourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocationIdFuture.complete(slotRequest.getAllocationId()));
        try (SlotPoolImpl slotPool = this.setUpSlotPool();){
            CompletableFuture<PhysicalSlot> slotFuture = this.requestNewAllocatedSlot(slotPool, new SlotRequestId());
            AllocationID allocationId = (AllocationID)allocationIdFuture.get();
            MatcherAssert.assertThat((Object)slotFuture.isDone(), (Matcher)Matchers.is((Object)false));
            FlinkException cause = new FlinkException("Fail pending slot request failure.");
            Optional responseFuture = slotPool.failAllocation(allocationId, (Exception)cause);
            MatcherAssert.assertThat((Object)responseFuture.isPresent(), (Matcher)Matchers.is((Object)false));
            try {
                slotFuture.get();
                Assert.fail((String)"Expected a slot allocation failure.");
            }
            catch (ExecutionException ee) {
                MatcherAssert.assertThat((Object)ExceptionUtils.stripExecutionException((Throwable)ee), (Matcher)Matchers.equalTo((Object)cause));
            }
        }
    }

    @Test
    public void testFailingResourceManagerRequestFailsPendingSlotRequestAndCancelsRMRequest() throws Exception {
        try (SlotPoolImpl slotPool = this.setUpSlotPool();){
            CompletableFuture<Acknowledge> requestSlotFuture = new CompletableFuture<Acknowledge>();
            CompletableFuture cancelSlotFuture = new CompletableFuture();
            CompletableFuture requestSlotFutureAllocationId = new CompletableFuture();
            this.resourceManagerGateway.setRequestSlotFuture(requestSlotFuture);
            this.resourceManagerGateway.setRequestSlotConsumer(slotRequest -> requestSlotFutureAllocationId.complete(slotRequest.getAllocationId()));
            this.resourceManagerGateway.setCancelSlotConsumer(cancelSlotFuture::complete);
            CompletableFuture<PhysicalSlot> slotFuture = this.requestNewAllocatedSlot(slotPool, new SlotRequestId());
            requestSlotFuture.completeExceptionally((Throwable)new FlinkException("Testing exception."));
            try {
                slotFuture.get();
                Assert.fail((String)"The slot future should not have been completed properly.");
            }
            catch (Exception exception) {
                // empty catch block
            }
            Assert.assertEquals(requestSlotFutureAllocationId.get(), cancelSlotFuture.get());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPendingSlotRequestTimeout() throws Exception {
        ScheduledExecutorService singleThreadExecutor = Executors.newSingleThreadScheduledExecutor();
        ComponentMainThreadExecutor componentMainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(singleThreadExecutor);
        SlotPoolImpl slotPool = this.setUpSlotPool(componentMainThreadExecutor);
        try {
            Time timeout = Time.milliseconds((long)5L);
            CompletionStage slotFuture = CompletableFuture.supplyAsync(() -> this.requestNewAllocatedSlot(slotPool, new SlotRequestId(), timeout), (Executor)componentMainThreadExecutor).thenCompose(Function.identity());
            try {
                ((CompletableFuture)slotFuture).get();
                Assert.fail((String)"Expected that the future completes with a TimeoutException.");
            }
            catch (ExecutionException ee) {
                MatcherAssert.assertThat((Object)ExceptionUtils.stripExecutionException((Throwable)ee), (Matcher)Matchers.instanceOf(TimeoutException.class));
            }
        }
        catch (Throwable throwable) {
            CompletableFuture.runAsync(ThrowingRunnable.unchecked(() -> ((SlotPoolImpl)slotPool).close()), (Executor)componentMainThreadExecutor).get();
            singleThreadExecutor.shutdownNow();
            throw throwable;
        }
        CompletableFuture.runAsync(ThrowingRunnable.unchecked(() -> ((SlotPoolImpl)slotPool).close()), (Executor)componentMainThreadExecutor).get();
        singleThreadExecutor.shutdownNow();
    }

    private CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(SlotPoolImpl slotPool, SlotRequestId slotRequestId) {
        return this.requestNewAllocatedSlot(slotPool, slotRequestId, TIMEOUT);
    }

    private CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(SlotPoolImpl slotPool, SlotRequestId slotRequestId, Time timeout) {
        return slotPool.requestNewAllocatedSlot(slotRequestId, ResourceProfile.UNKNOWN, timeout);
    }

    private SlotPoolImpl setUpSlotPool() throws Exception {
        return this.setUpSlotPool(mainThreadExecutor);
    }

    private SlotPoolImpl setUpSlotPool(ComponentMainThreadExecutor componentMainThreadExecutor) throws Exception {
        TestingSlotPoolImpl slotPool = new TestingSlotPoolImpl(jobId);
        slotPool.start(JobMasterId.generate(), "foobar", componentMainThreadExecutor);
        slotPool.connectToResourceManager(this.resourceManagerGateway);
        return slotPool;
    }
}

