/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rest.handler.legacy.backpressure;

import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertexTest;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.messages.TaskBackPressureResponse;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureRequestCoordinator;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStats;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;

public class BackPressureRequestCoordinatorTest
extends TestLogger {
    private static final long requestTimeout = 10000L;
    private static final double backPressureRatio = 0.5;
    private static final String requestTimeoutMessage = "Request timeout.";
    private static ScheduledExecutorService executorService;
    private BackPressureRequestCoordinator coordinator;
    @Rule
    public Timeout caseTimeout = new Timeout(10L, TimeUnit.SECONDS);

    @BeforeClass
    public static void setUp() throws Exception {
        executorService = new ScheduledThreadPoolExecutor(1);
    }

    @AfterClass
    public static void tearDown() throws Exception {
        if (executorService != null) {
            executorService.shutdown();
        }
    }

    @Before
    public void initCoordinator() throws Exception {
        this.coordinator = new BackPressureRequestCoordinator((Executor)executorService, 10000L);
    }

    @After
    public void shutdownCoordinator() throws Exception {
        if (this.coordinator != null) {
            Assert.assertEquals((long)0L, (long)this.coordinator.getNumberOfPendingRequests());
            this.coordinator.shutDown();
        }
    }

    @Test
    public void testSuccessfulBackPressureRequest() throws Exception {
        ExecutionVertex[] vertices = this.createExecutionVertices(ExecutionState.RUNNING, CompletionType.SUCCESSFULLY);
        CompletableFuture requestFuture = this.coordinator.triggerBackPressureRequest(vertices);
        BackPressureStats backPressureStats = (BackPressureStats)requestFuture.get();
        Assert.assertEquals((long)0L, (long)backPressureStats.getRequestId());
        Map backPressureRatios = backPressureStats.getBackPressureRatios();
        for (ExecutionVertex executionVertex : vertices) {
            ExecutionAttemptID executionId = executionVertex.getCurrentExecutionAttempt().getAttemptId();
            Assert.assertEquals((double)0.5, (double)((Double)backPressureRatios.get(executionId)), (double)0.0);
        }
    }

    @Test
    public void testRequestNotRunningTasks() throws Exception {
        ExecutionVertex[] vertices = this.createExecutionVertices(ExecutionState.DEPLOYING, CompletionType.SUCCESSFULLY);
        CompletableFuture requestFuture = this.coordinator.triggerBackPressureRequest(vertices);
        Assert.assertTrue((boolean)requestFuture.isDone());
        try {
            requestFuture.get();
            Assert.fail((String)"Exception expected.");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof IllegalStateException));
        }
    }

    @Test
    public void testBackPressureRequestWithException() throws Exception {
        ExecutionVertex[] vertices = this.createExecutionVertices(ExecutionState.RUNNING, CompletionType.EXCEPTIONALLY);
        CompletableFuture requestFuture = this.coordinator.triggerBackPressureRequest(vertices);
        try {
            requestFuture.get();
            Assert.fail((String)"Exception expected.");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof RuntimeException));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testBackPressureRequestTimeout() throws Exception {
        long requestTimeout = 100L;
        ExecutionVertex[] vertices = this.createExecutionVertices(ExecutionState.RUNNING, CompletionType.TIMEOUT, requestTimeout);
        BackPressureRequestCoordinator coordinator = new BackPressureRequestCoordinator((Executor)executorService, requestTimeout);
        try {
            CompletableFuture requestFuture = coordinator.triggerBackPressureRequest(vertices);
            requestFuture.get();
            Assert.fail((String)"Exception expected.");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)ExceptionUtils.findThrowableWithMessage((Throwable)e, (String)requestTimeoutMessage).isPresent());
        }
        finally {
            coordinator.shutDown();
        }
    }

    @Test
    public void testShutDown() throws Exception {
        ExecutionVertex[] vertices = this.createExecutionVertices(ExecutionState.RUNNING, CompletionType.NEVER_COMPLETE);
        ArrayList<CompletableFuture> requestFutures = new ArrayList<CompletableFuture>();
        requestFutures.add(this.coordinator.triggerBackPressureRequest(vertices));
        requestFutures.add(this.coordinator.triggerBackPressureRequest(vertices));
        for (CompletableFuture future : requestFutures) {
            Assert.assertFalse((boolean)future.isDone());
        }
        this.coordinator.shutDown();
        for (CompletableFuture future : requestFutures) {
            Assert.assertTrue((boolean)future.isCompletedExceptionally());
        }
        CompletableFuture future = this.coordinator.triggerBackPressureRequest(vertices);
        Assert.assertTrue((boolean)future.isCompletedExceptionally());
    }

    private ExecutionVertex[] createExecutionVertices(ExecutionState state, CompletionType completionType) throws Exception {
        return this.createExecutionVertices(state, completionType, 10000L);
    }

    private ExecutionVertex[] createExecutionVertices(ExecutionState state, CompletionType completionType, long requestTimeout) throws Exception {
        return new ExecutionVertex[]{this.createExecutionVertex(0, ExecutionState.RUNNING, CompletionType.SUCCESSFULLY, requestTimeout), this.createExecutionVertex(1, state, completionType, requestTimeout), this.createExecutionVertex(2, ExecutionState.RUNNING, CompletionType.SUCCESSFULLY, requestTimeout)};
    }

    private ExecutionVertex createExecutionVertex(int subTaskIndex, ExecutionState state, CompletionType completionType, long requestTimeout) throws Exception {
        return new TestingExecutionVertex(ExecutionJobVertexTest.createExecutionJobVertex(4, 4), subTaskIndex, Time.seconds((long)10L), 1L, System.currentTimeMillis(), state, completionType, requestTimeout);
    }

    private static class TestingExecution
    extends Execution {
        private final ExecutionState state;
        private final CompletionType completionType;
        private final long requestTimeout;

        TestingExecution(Executor executor, ExecutionVertex vertex, int attemptNumber, long globalModVersion, long startTimestamp, Time rpcTimeout, ExecutionState state, CompletionType completionType, long requestTimeout) {
            super(executor, vertex, attemptNumber, globalModVersion, startTimestamp, rpcTimeout);
            this.state = (ExecutionState)Preconditions.checkNotNull((Object)state);
            this.completionType = (CompletionType)((Object)Preconditions.checkNotNull((Object)((Object)completionType)));
            this.requestTimeout = requestTimeout;
        }

        public CompletableFuture<TaskBackPressureResponse> requestBackPressure(int requestId, Time timeout) {
            CompletableFuture<TaskBackPressureResponse> responseFuture = new CompletableFuture<TaskBackPressureResponse>();
            switch (this.completionType) {
                case SUCCESSFULLY: {
                    responseFuture.complete(new TaskBackPressureResponse(0, this.getAttemptId(), 0.5));
                    break;
                }
                case EXCEPTIONALLY: {
                    responseFuture.completeExceptionally(new RuntimeException("Request failed."));
                    break;
                }
                case TIMEOUT: {
                    executorService.schedule(() -> responseFuture.completeExceptionally(new TimeoutException(BackPressureRequestCoordinatorTest.requestTimeoutMessage)), this.requestTimeout, TimeUnit.MILLISECONDS);
                    break;
                }
                case NEVER_COMPLETE: {
                    break;
                }
                default: {
                    throw new RuntimeException("Unknown completion type.");
                }
            }
            return responseFuture;
        }

        public ExecutionState getState() {
            return this.state;
        }
    }

    private static class TestingExecutionVertex
    extends ExecutionVertex {
        private final Execution execution;

        TestingExecutionVertex(ExecutionJobVertex jobVertex, int subTaskIndex, Time timeout, long initialGlobalModVersion, long createTimestamp, ExecutionState state, CompletionType completionType, long requestTimeout) {
            super(jobVertex, subTaskIndex, new IntermediateResult[0], timeout, initialGlobalModVersion, createTimestamp, ((Integer)JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue()).intValue());
            this.execution = new TestingExecution(Runnable::run, this, 0, initialGlobalModVersion, createTimestamp, timeout, state, completionType, requestTimeout);
        }

        public Execution getCurrentExecutionAttempt() {
            return this.execution;
        }
    }

    private static enum CompletionType {
        SUCCESSFULLY,
        EXCEPTIONALLY,
        TIMEOUT,
        NEVER_COMPLETE;

    }
}

