package org.apache.flink.runtime.executiongraph;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Predicate;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.failover.flip1.FixedDelayRestartBackoffTimeStrategy;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlotProvider;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.FlinkException;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.class */
public class ExecutionGraphCoLocationRestartTest {

    @ClassRule
    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorResource();
    private static final int NUM_TASKS = 31;

    @Test
    public void testConstraintsAfterRestart() throws Exception {
        JobVertex createNoOpVertex = ExecutionGraphTestUtils.createNoOpVertex(NUM_TASKS);
        JobVertex createNoOpVertex2 = ExecutionGraphTestUtils.createNoOpVertex(NUM_TASKS);
        createNoOpVertex2.connectNewDataSetAsInput(createNoOpVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        createNoOpVertex.setSlotSharingGroup(slotSharingGroup);
        createNoOpVertex2.setSlotSharingGroup(slotSharingGroup);
        createNoOpVertex.setStrictlyCoLocatedWith(createNoOpVertex2);
        JobGraph streamingJobGraph = JobGraphTestUtils.streamingJobGraph(createNoOpVertex, createNoOpVertex2);
        ManuallyTriggeredScheduledExecutorService manuallyTriggeredScheduledExecutorService = new ManuallyTriggeredScheduledExecutorService();
        DefaultScheduler mo522build = new SchedulerTestingUtils.DefaultSchedulerBuilder(streamingJobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), (ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()).setExecutionSlotAllocatorFactory(SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(TestingPhysicalSlotProvider.create(resourceProfile -> {
            return CompletableFuture.completedFuture(TestingPhysicalSlot.builder().build());
        }))).setDelayExecutor(manuallyTriggeredScheduledExecutorService).setRestartBackoffTimeStrategy(new FixedDelayRestartBackoffTimeStrategy.FixedDelayRestartBackoffTimeStrategyFactory(1, 0L).create()).mo522build();
        ExecutionGraph executionGraph = mo522build.getExecutionGraph();
        Assert.assertEquals(JobStatus.CREATED, executionGraph.getState());
        mo522build.startScheduling();
        Predicate<AccessExecution> isInExecutionState = ExecutionGraphTestUtils.isInExecutionState(ExecutionState.DEPLOYING);
        ExecutionGraphTestUtils.waitForAllExecutionsPredicate(executionGraph, isInExecutionState, 5000L);
        Assert.assertEquals(JobStatus.RUNNING, executionGraph.getState());
        validateConstraints(executionGraph);
        ((ExecutionVertex) executionGraph.getAllExecutionVertices().iterator().next()).fail(new FlinkException("Test exception"));
        Assert.assertEquals(JobStatus.RESTARTING, executionGraph.getState());
        manuallyTriggeredScheduledExecutorService.triggerNonPeriodicScheduledTask();
        for (ExecutionVertex executionVertex : executionGraph.getAllExecutionVertices()) {
            if (executionVertex.getExecutionState() == ExecutionState.CANCELING) {
                executionVertex.getCurrentExecutionAttempt().completeCancelling();
            }
        }
        ExecutionGraphTestUtils.waitUntilJobStatus(executionGraph, JobStatus.RUNNING, 5000L);
        ExecutionGraphTestUtils.waitForAllExecutionsPredicate(executionGraph, isInExecutionState, 5000L);
        validateConstraints(executionGraph);
        ExecutionGraphTestUtils.finishAllVertices(executionGraph);
        Assert.assertThat(executionGraph.getState(), Matchers.is(JobStatus.FINISHED));
    }

    private void validateConstraints(ExecutionGraph executionGraph) {
        ExecutionJobVertex[] executionJobVertexArr = (ExecutionJobVertex[]) executionGraph.getAllVertices().values().toArray(new ExecutionJobVertex[2]);
        for (int i = 0; i < NUM_TASKS; i++) {
            Assert.assertThat(executionJobVertexArr[0].getTaskVertices()[i].getCurrentAssignedResourceLocation(), Matchers.is(executionJobVertexArr[1].getTaskVertices()[i].getCurrentAssignedResourceLocation()));
        }
    }
}
