package org.apache.flink.runtime.checkpoint;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils;
import org.apache.flink.runtime.checkpoint.VertexFinishedStateChecker;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.taskmanager.TaskTest;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.util.JobVertexConnectionUtils;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.FlinkRuntimeException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/VertexFinishedStateCheckerTest.class */
class VertexFinishedStateCheckerTest {

    @RegisterExtension
    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION = TestingUtils.defaultExecutorExtension();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.runtime.checkpoint.VertexFinishedStateCheckerTest$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/VertexFinishedStateCheckerTest$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$runtime$checkpoint$VertexFinishedStateChecker$VertexFinishedState = new int[VertexFinishedStateChecker.VertexFinishedState.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$runtime$checkpoint$VertexFinishedStateChecker$VertexFinishedState[VertexFinishedStateChecker.VertexFinishedState.ALL_RUNNING.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$checkpoint$VertexFinishedStateChecker$VertexFinishedState[VertexFinishedStateChecker.VertexFinishedState.PARTIALLY_FINISHED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$checkpoint$VertexFinishedStateChecker$VertexFinishedState[VertexFinishedStateChecker.VertexFinishedState.FULLY_FINISHED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    VertexFinishedStateCheckerTest() {
    }

    @Test
    void testRestoringPartiallyFinishedChainsFailsWithoutUidHash() throws Exception {
        testRestoringPartiallyFinishedChainsFails(false);
    }

    @Test
    void testRestoringPartiallyFinishedChainsFailsWithUidHash() throws Exception {
        testRestoringPartiallyFinishedChainsFails(true);
    }

    private void testRestoringPartiallyFinishedChainsFails(boolean z) throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        OperatorIDPair of = OperatorIDPair.of(new OperatorID(), new OperatorID(), "operatorName", "operatorUid");
        OperatorIDPair generatedIDOnly = OperatorIDPair.generatedIDOnly(new OperatorID());
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID2, 1, 1, Collections.singletonList(OperatorIDPair.generatedIDOnly(new OperatorID())), true).addJobVertex(jobVertexID, 1, 1, Arrays.asList(of, generatedIDOnly), true).build((ScheduledExecutorService) EXECUTOR_EXTENSION.getExecutor());
        HashMap hashMap = new HashMap();
        hashMap.put(z ? (OperatorID) of.getUserDefinedOperatorID().get() : of.getGeneratedOperatorID(), new FullyFinishedOperatorState((String) null, (String) null, of.getGeneratedOperatorID(), 1, 1));
        hashMap.put(generatedIDOnly.getGeneratedOperatorID(), new OperatorState((String) null, (String) null, generatedIDOnly.getGeneratedOperatorID(), 1, 1));
        HashSet hashSet = new HashSet();
        hashSet.add(build.getJobVertex(jobVertexID));
        VertexFinishedStateChecker vertexFinishedStateChecker = new VertexFinishedStateChecker(hashSet, hashMap);
        Objects.requireNonNull(vertexFinishedStateChecker);
        Assertions.assertThatThrownBy(vertexFinishedStateChecker::validateOperatorsFinishedState).hasMessage("Can not restore vertex anon(" + jobVertexID + ") which contain mixed operator finished state: [ALL_RUNNING, FULLY_FINISHED]").isInstanceOf(FlinkRuntimeException.class);
    }

    @Test
    void testAddingRunningOperatorBeforeFinishedOneFails() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        testAddingOperatorsBeforePartiallyOrFullyFinishedOne(new JobVertexID(), "vert1", VertexFinishedStateChecker.VertexFinishedState.ALL_RUNNING, jobVertexID, "vert2", VertexFinishedStateChecker.VertexFinishedState.FULLY_FINISHED, new DistributionPattern[]{DistributionPattern.ALL_TO_ALL}, FlinkRuntimeException.class, "Illegal JobGraph modification. Cannot run a program with fully finished vertices predeceased with the ones not fully finished. Task vertex vert2(" + jobVertexID + ") has a predecessor not fully finished");
    }

    @Test
    void testAddingPartiallyFinishedOperatorBeforeFinishedOneFails() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        testAddingOperatorsBeforePartiallyOrFullyFinishedOne(new JobVertexID(), "vert1", VertexFinishedStateChecker.VertexFinishedState.PARTIALLY_FINISHED, jobVertexID, "vert2", VertexFinishedStateChecker.VertexFinishedState.FULLY_FINISHED, new DistributionPattern[]{DistributionPattern.ALL_TO_ALL}, FlinkRuntimeException.class, "Illegal JobGraph modification. Cannot run a program with fully finished vertices predeceased with the ones not fully finished. Task vertex vert2(" + jobVertexID + ") has a predecessor not fully finished");
    }

    @Test
    void testAddingAllRunningOperatorBeforePartiallyFinishedOneWithAllToAllFails() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        testAddingOperatorsBeforePartiallyOrFullyFinishedOne(new JobVertexID(), "vert1", VertexFinishedStateChecker.VertexFinishedState.ALL_RUNNING, jobVertexID, "vert2", VertexFinishedStateChecker.VertexFinishedState.PARTIALLY_FINISHED, new DistributionPattern[]{DistributionPattern.ALL_TO_ALL}, FlinkRuntimeException.class, "Illegal JobGraph modification. Cannot run a program with partially finished vertices predeceased with running or partially finished ones and connected via the ALL_TO_ALL edges. Task vertex vert2(" + jobVertexID + ") has a all running predecessor");
    }

    @Test
    void testAddingPartiallyFinishedOperatorBeforePartiallyFinishedOneWithAllToAllFails() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        testAddingOperatorsBeforePartiallyOrFullyFinishedOne(new JobVertexID(), "vert1", VertexFinishedStateChecker.VertexFinishedState.PARTIALLY_FINISHED, jobVertexID, "vert2", VertexFinishedStateChecker.VertexFinishedState.PARTIALLY_FINISHED, new DistributionPattern[]{DistributionPattern.ALL_TO_ALL}, FlinkRuntimeException.class, "Illegal JobGraph modification. Cannot run a program with partially finished vertices predeceased with running or partially finished ones and connected via the ALL_TO_ALL edges. Task vertex vert2(" + jobVertexID + ") has a partially finished predecessor");
    }

    @Test
    void testAddingPartiallyFinishedOperatorBeforePartiallyFinishedOneWithPointwiseAndAllToAllFails() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        testAddingOperatorsBeforePartiallyOrFullyFinishedOne(new JobVertexID(), "vert1", VertexFinishedStateChecker.VertexFinishedState.PARTIALLY_FINISHED, jobVertexID, "vert2", VertexFinishedStateChecker.VertexFinishedState.PARTIALLY_FINISHED, new DistributionPattern[]{DistributionPattern.POINTWISE, DistributionPattern.ALL_TO_ALL}, FlinkRuntimeException.class, "Illegal JobGraph modification. Cannot run a program with partially finished vertices predeceased with running or partially finished ones and connected via the ALL_TO_ALL edges. Task vertex vert2(" + jobVertexID + ") has a partially finished predecessor");
    }

    @Test
    void testAddingAllRunningOperatorBeforePartiallyFinishedOneFails() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        testAddingOperatorsBeforePartiallyOrFullyFinishedOne(new JobVertexID(), "vert1", VertexFinishedStateChecker.VertexFinishedState.ALL_RUNNING, jobVertexID, "vert2", VertexFinishedStateChecker.VertexFinishedState.PARTIALLY_FINISHED, new DistributionPattern[]{DistributionPattern.POINTWISE}, FlinkRuntimeException.class, "Illegal JobGraph modification. Cannot run a program with partially finished vertices predeceased with all running ones. Task vertex vert2(" + jobVertexID + ") has a all running predecessor");
    }

    private void testAddingOperatorsBeforePartiallyOrFullyFinishedOne(JobVertexID jobVertexID, String str, VertexFinishedStateChecker.VertexFinishedState vertexFinishedState, JobVertexID jobVertexID2, String str2, VertexFinishedStateChecker.VertexFinishedState vertexFinishedState2, DistributionPattern[] distributionPatternArr, Class<? extends Throwable> cls, String str3) throws Exception {
        OperatorIDPair generatedIDOnly = OperatorIDPair.generatedIDOnly(new OperatorID());
        OperatorIDPair generatedIDOnly2 = OperatorIDPair.generatedIDOnly(new OperatorID());
        JobVertex jobVertex = new JobVertex(str, jobVertexID, Collections.singletonList(generatedIDOnly));
        JobVertex jobVertex2 = new JobVertex(str2, jobVertexID2, Collections.singletonList(generatedIDOnly2));
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertex, true).addJobVertex(jobVertex2, false).setDistributionPattern(distributionPatternArr[0]).build((ScheduledExecutorService) EXECUTOR_EXTENSION.getExecutor());
        for (int i = 1; i < distributionPatternArr.length; i++) {
            JobVertexConnectionUtils.connectNewDataSetAsInput(jobVertex2, jobVertex, distributionPatternArr[i], ResultPartitionType.PIPELINED);
        }
        HashMap hashMap = new HashMap();
        hashMap.put(generatedIDOnly.getGeneratedOperatorID(), createOperatorState(generatedIDOnly.getGeneratedOperatorID(), vertexFinishedState));
        hashMap.put(generatedIDOnly2.getGeneratedOperatorID(), createOperatorState(generatedIDOnly2.getGeneratedOperatorID(), vertexFinishedState2));
        HashSet hashSet = new HashSet();
        hashSet.add(build.getJobVertex(jobVertex.getID()));
        hashSet.add(build.getJobVertex(jobVertex2.getID()));
        VertexFinishedStateChecker vertexFinishedStateChecker = new VertexFinishedStateChecker(hashSet, hashMap);
        Objects.requireNonNull(vertexFinishedStateChecker);
        Assertions.assertThatThrownBy(vertexFinishedStateChecker::validateOperatorsFinishedState).hasMessage(str3).isInstanceOf(cls);
    }

    private OperatorState createOperatorState(OperatorID operatorID, VertexFinishedStateChecker.VertexFinishedState vertexFinishedState) {
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$runtime$checkpoint$VertexFinishedStateChecker$VertexFinishedState[vertexFinishedState.ordinal()]) {
            case 1:
                return new OperatorState((String) null, (String) null, operatorID, 2, 2);
            case TaskTest.InvokableDecliningCheckpoints.REJECTED_EXECUTION_CHECKPOINT_ID /* 2 */:
                OperatorState operatorState = new OperatorState((String) null, (String) null, operatorID, 2, 2);
                operatorState.putState(0, FinishedOperatorSubtaskState.INSTANCE);
                return operatorState;
            case TaskTest.InvokableDecliningCheckpoints.THROWING_CHECKPOINT_ID /* 3 */:
                return new FullyFinishedOperatorState((String) null, (String) null, operatorID, 2, 2);
            default:
                throw new UnsupportedOperationException("Not supported finished state: " + vertexFinishedState);
        }
    }
}
