package org.apache.flink.runtime.rest.handler.legacy.backpressure;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertexTest;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.leaderelection.LeaderChangeClusterComponentsTest;
import org.apache.flink.runtime.messages.TaskBackPressureResponse;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;

/* loaded from: input_file:org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureRequestCoordinatorTest.class */
public class BackPressureRequestCoordinatorTest extends TestLogger {
    private static final long requestTimeout = 10000;
    private static final double backPressureRatio = 0.5d;
    private static final String requestTimeoutMessage = "Request timeout.";
    private static ScheduledExecutorService executorService;
    private BackPressureRequestCoordinator coordinator;

    @Rule
    public Timeout caseTimeout = new Timeout(10, TimeUnit.SECONDS);

    /* renamed from: org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureRequestCoordinatorTest$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureRequestCoordinatorTest$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$runtime$rest$handler$legacy$backpressure$BackPressureRequestCoordinatorTest$CompletionType = new int[CompletionType.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$runtime$rest$handler$legacy$backpressure$BackPressureRequestCoordinatorTest$CompletionType[CompletionType.SUCCESSFULLY.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$rest$handler$legacy$backpressure$BackPressureRequestCoordinatorTest$CompletionType[CompletionType.EXCEPTIONALLY.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$rest$handler$legacy$backpressure$BackPressureRequestCoordinatorTest$CompletionType[CompletionType.TIMEOUT.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$rest$handler$legacy$backpressure$BackPressureRequestCoordinatorTest$CompletionType[CompletionType.NEVER_COMPLETE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureRequestCoordinatorTest$CompletionType.class */
    public enum CompletionType {
        SUCCESSFULLY,
        EXCEPTIONALLY,
        TIMEOUT,
        NEVER_COMPLETE
    }

    /* loaded from: input_file:org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureRequestCoordinatorTest$TestingExecution.class */
    private static class TestingExecution extends Execution {
        private final ExecutionState state;
        private final CompletionType completionType;
        private final long requestTimeout;

        TestingExecution(Executor executor, ExecutionVertex executionVertex, int i, long j, long j2, Time time, ExecutionState executionState, CompletionType completionType, long j3) {
            super(executor, executionVertex, i, j, j2, time);
            this.state = (ExecutionState) Preconditions.checkNotNull(executionState);
            this.completionType = (CompletionType) Preconditions.checkNotNull(completionType);
            this.requestTimeout = j3;
        }

        public CompletableFuture<TaskBackPressureResponse> requestBackPressure(int i, Time time) {
            CompletableFuture<TaskBackPressureResponse> completableFuture = new CompletableFuture<>();
            switch (AnonymousClass1.$SwitchMap$org$apache$flink$runtime$rest$handler$legacy$backpressure$BackPressureRequestCoordinatorTest$CompletionType[this.completionType.ordinal()]) {
                case 1:
                    completableFuture.complete(new TaskBackPressureResponse(0, getAttemptId(), BackPressureRequestCoordinatorTest.backPressureRatio));
                    break;
                case 2:
                    completableFuture.completeExceptionally(new RuntimeException("Request failed."));
                    break;
                case 3:
                    BackPressureRequestCoordinatorTest.executorService.schedule(() -> {
                        return Boolean.valueOf(completableFuture.completeExceptionally(new TimeoutException(BackPressureRequestCoordinatorTest.requestTimeoutMessage)));
                    }, this.requestTimeout, TimeUnit.MILLISECONDS);
                    break;
                case LeaderChangeClusterComponentsTest.PARALLELISM /* 4 */:
                    break;
                default:
                    throw new RuntimeException("Unknown completion type.");
            }
            return completableFuture;
        }

        public ExecutionState getState() {
            return this.state;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureRequestCoordinatorTest$TestingExecutionVertex.class */
    public static class TestingExecutionVertex extends ExecutionVertex {
        private final Execution execution;

        TestingExecutionVertex(ExecutionJobVertex executionJobVertex, int i, Time time, long j, long j2, ExecutionState executionState, CompletionType completionType, long j3) {
            super(executionJobVertex, i, new IntermediateResult[0], time, j, j2, ((Integer) JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue()).intValue());
            this.execution = new TestingExecution((v0) -> {
                v0.run();
            }, this, 0, j, j2, time, executionState, completionType, j3);
        }

        /* renamed from: getCurrentExecutionAttempt, reason: merged with bridge method [inline-methods] */
        public Execution m340getCurrentExecutionAttempt() {
            return this.execution;
        }
    }

    @BeforeClass
    public static void setUp() throws Exception {
        executorService = new ScheduledThreadPoolExecutor(1);
    }

    @AfterClass
    public static void tearDown() throws Exception {
        if (executorService != null) {
            executorService.shutdown();
        }
    }

    @Before
    public void initCoordinator() throws Exception {
        this.coordinator = new BackPressureRequestCoordinator(executorService, requestTimeout);
    }

    @After
    public void shutdownCoordinator() throws Exception {
        if (this.coordinator != null) {
            Assert.assertEquals(0L, this.coordinator.getNumberOfPendingRequests());
            this.coordinator.shutDown();
        }
    }

    @Test
    public void testSuccessfulBackPressureRequest() throws Exception {
        ExecutionVertex[] createExecutionVertices = createExecutionVertices(ExecutionState.RUNNING, CompletionType.SUCCESSFULLY);
        BackPressureStats backPressureStats = (BackPressureStats) this.coordinator.triggerBackPressureRequest(createExecutionVertices).get();
        Assert.assertEquals(0L, backPressureStats.getRequestId());
        Map backPressureRatios = backPressureStats.getBackPressureRatios();
        for (ExecutionVertex executionVertex : createExecutionVertices) {
            Assert.assertEquals(backPressureRatio, ((Double) backPressureRatios.get(executionVertex.getCurrentExecutionAttempt().getAttemptId())).doubleValue(), 0.0d);
        }
    }

    @Test
    public void testRequestNotRunningTasks() throws Exception {
        CompletableFuture triggerBackPressureRequest = this.coordinator.triggerBackPressureRequest(createExecutionVertices(ExecutionState.DEPLOYING, CompletionType.SUCCESSFULLY));
        Assert.assertTrue(triggerBackPressureRequest.isDone());
        try {
            triggerBackPressureRequest.get();
            Assert.fail("Exception expected.");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof IllegalStateException);
        }
    }

    @Test
    public void testBackPressureRequestWithException() throws Exception {
        try {
            this.coordinator.triggerBackPressureRequest(createExecutionVertices(ExecutionState.RUNNING, CompletionType.EXCEPTIONALLY)).get();
            Assert.fail("Exception expected.");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof RuntimeException);
        }
    }

    @Test
    public void testBackPressureRequestTimeout() throws Exception {
        ExecutionVertex[] createExecutionVertices = createExecutionVertices(ExecutionState.RUNNING, CompletionType.TIMEOUT, 100L);
        BackPressureRequestCoordinator backPressureRequestCoordinator = new BackPressureRequestCoordinator(executorService, 100L);
        try {
            try {
                backPressureRequestCoordinator.triggerBackPressureRequest(createExecutionVertices).get();
                Assert.fail("Exception expected.");
                backPressureRequestCoordinator.shutDown();
            } catch (ExecutionException e) {
                Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(e, requestTimeoutMessage).isPresent());
                backPressureRequestCoordinator.shutDown();
            }
        } catch (Throwable th) {
            backPressureRequestCoordinator.shutDown();
            throw th;
        }
    }

    @Test
    public void testShutDown() throws Exception {
        ExecutionVertex[] createExecutionVertices = createExecutionVertices(ExecutionState.RUNNING, CompletionType.NEVER_COMPLETE);
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.coordinator.triggerBackPressureRequest(createExecutionVertices));
        arrayList.add(this.coordinator.triggerBackPressureRequest(createExecutionVertices));
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            Assert.assertFalse(((CompletableFuture) it.next()).isDone());
        }
        this.coordinator.shutDown();
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            Assert.assertTrue(((CompletableFuture) it2.next()).isCompletedExceptionally());
        }
        Assert.assertTrue(this.coordinator.triggerBackPressureRequest(createExecutionVertices).isCompletedExceptionally());
    }

    private ExecutionVertex[] createExecutionVertices(ExecutionState executionState, CompletionType completionType) throws Exception {
        return createExecutionVertices(executionState, completionType, requestTimeout);
    }

    private ExecutionVertex[] createExecutionVertices(ExecutionState executionState, CompletionType completionType, long j) throws Exception {
        return new ExecutionVertex[]{createExecutionVertex(0, ExecutionState.RUNNING, CompletionType.SUCCESSFULLY, j), createExecutionVertex(1, executionState, completionType, j), createExecutionVertex(2, ExecutionState.RUNNING, CompletionType.SUCCESSFULLY, j)};
    }

    private ExecutionVertex createExecutionVertex(int i, ExecutionState executionState, CompletionType completionType, long j) throws Exception {
        return new TestingExecutionVertex(ExecutionJobVertexTest.createExecutionJobVertex(4, 4), i, Time.seconds(10L), 1L, System.currentTimeMillis(), executionState, completionType, j);
    }
}
