package org.apache.flink.runtime.executiongraph;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.core.io.InputSplitSource;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.util.JobVertexConnectionUtils;
import org.apache.flink.shaded.guava32.com.google.common.collect.Sets;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/DefaultExecutionGraphConstructionTest.class */
class DefaultExecutionGraphConstructionTest {

    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();
    private static final JobManagerJobMetricGroup JOB_MANAGER_JOB_METRIC_GROUP = UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup();

    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/DefaultExecutionGraphConstructionTest$TestingInputSplitAssigner.class */
    private static final class TestingInputSplitAssigner implements InputSplitAssigner {
        private TestingInputSplitAssigner() {
        }

        public InputSplit getNextInputSplit(String str, int i) {
            return null;
        }

        public void returnInputSplit(List<InputSplit> list, int i) {
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/DefaultExecutionGraphConstructionTest$TestingInputSplitSource.class */
    private static final class TestingInputSplitSource<T extends InputSplit> implements InputSplitSource<T> {
        private final T[] inputSplits;
        private final InputSplitAssigner assigner;

        private TestingInputSplitSource(T[] tArr, InputSplitAssigner inputSplitAssigner) {
            this.inputSplits = tArr;
            this.assigner = inputSplitAssigner;
        }

        public T[] createInputSplits(int i) throws Exception {
            return this.inputSplits;
        }

        public InputSplitAssigner getInputSplitAssigner(T[] tArr) {
            return this.assigner;
        }
    }

    DefaultExecutionGraphConstructionTest() {
    }

    private ExecutionGraph createDefaultExecutionGraph(List<JobVertex> list) throws Exception {
        return TestingDefaultExecutionGraphBuilder.newBuilder().setVertexParallelismStore(SchedulerBase.computeVertexParallelismStore(list)).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
    }

    private ExecutionGraph createDynamicExecutionGraph(List<JobVertex> list) throws Exception {
        return TestingDefaultExecutionGraphBuilder.newBuilder().setVertexParallelismStore(SchedulerBase.computeVertexParallelismStore(list)).buildDynamicGraph((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
    }

    @Test
    void testExecutionAttemptIdInTwoIdenticalJobsIsNotSame() throws Exception {
        JobVertex jobVertex = new JobVertex("vertex1");
        JobVertex jobVertex2 = new JobVertex("vertex2");
        JobVertex jobVertex3 = new JobVertex("vertex3");
        jobVertex.setParallelism(5);
        jobVertex2.setParallelism(7);
        jobVertex3.setParallelism(2);
        jobVertex.setInvokableClass(AbstractInvokable.class);
        jobVertex2.setInvokableClass(AbstractInvokable.class);
        jobVertex3.setInvokableClass(AbstractInvokable.class);
        ArrayList arrayList = new ArrayList(Arrays.asList(jobVertex, jobVertex2, jobVertex3));
        ExecutionGraph createDefaultExecutionGraph = createDefaultExecutionGraph(arrayList);
        ExecutionGraph createDefaultExecutionGraph2 = createDefaultExecutionGraph(arrayList);
        createDefaultExecutionGraph.attachJobGraph(arrayList, JOB_MANAGER_JOB_METRIC_GROUP);
        createDefaultExecutionGraph2.attachJobGraph(arrayList, JOB_MANAGER_JOB_METRIC_GROUP);
        Assertions.assertThat(Sets.intersection(createDefaultExecutionGraph.getRegisteredExecutions().keySet(), createDefaultExecutionGraph2.getRegisteredExecutions().keySet())).isEmpty();
    }

    @Test
    void testCreateSimpleGraphBipartite() throws Exception {
        JobVertex jobVertex = new JobVertex("vertex1");
        JobVertex jobVertex2 = new JobVertex("vertex2");
        JobVertex jobVertex3 = new JobVertex("vertex3");
        JobVertex jobVertex4 = new JobVertex("vertex4");
        JobVertex jobVertex5 = new JobVertex("vertex5");
        jobVertex.setParallelism(5);
        jobVertex2.setParallelism(7);
        jobVertex3.setParallelism(2);
        jobVertex4.setParallelism(11);
        jobVertex5.setParallelism(4);
        jobVertex.setInvokableClass(AbstractInvokable.class);
        jobVertex2.setInvokableClass(AbstractInvokable.class);
        jobVertex3.setInvokableClass(AbstractInvokable.class);
        jobVertex4.setInvokableClass(AbstractInvokable.class);
        jobVertex5.setInvokableClass(AbstractInvokable.class);
        JobVertexConnectionUtils.connectNewDataSetAsInput(jobVertex2, jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        JobVertexConnectionUtils.connectNewDataSetAsInput(jobVertex4, jobVertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        JobVertexConnectionUtils.connectNewDataSetAsInput(jobVertex4, jobVertex3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        JobVertexConnectionUtils.connectNewDataSetAsInput(jobVertex5, jobVertex4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        JobVertexConnectionUtils.connectNewDataSetAsInput(jobVertex5, jobVertex3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        ArrayList arrayList = new ArrayList(Arrays.asList(jobVertex, jobVertex2, jobVertex3, jobVertex4, jobVertex5));
        ExecutionGraph createDefaultExecutionGraph = createDefaultExecutionGraph(arrayList);
        createDefaultExecutionGraph.attachJobGraph(arrayList, JOB_MANAGER_JOB_METRIC_GROUP);
        verifyTestGraph(createDefaultExecutionGraph, jobVertex, jobVertex2, jobVertex3, jobVertex4, jobVertex5);
    }

    private void verifyTestGraph(ExecutionGraph executionGraph, JobVertex jobVertex, JobVertex jobVertex2, JobVertex jobVertex3, JobVertex jobVertex4, JobVertex jobVertex5) {
        ExecutionGraphTestUtils.verifyGeneratedExecutionJobVertex(executionGraph, jobVertex, null, Collections.singletonList(jobVertex2));
        ExecutionGraphTestUtils.verifyGeneratedExecutionJobVertex(executionGraph, jobVertex2, Collections.singletonList(jobVertex), Collections.singletonList(jobVertex4));
        ExecutionGraphTestUtils.verifyGeneratedExecutionJobVertex(executionGraph, jobVertex3, null, Arrays.asList(jobVertex4, jobVertex5));
        ExecutionGraphTestUtils.verifyGeneratedExecutionJobVertex(executionGraph, jobVertex4, Arrays.asList(jobVertex2, jobVertex3), Collections.singletonList(jobVertex5));
        ExecutionGraphTestUtils.verifyGeneratedExecutionJobVertex(executionGraph, jobVertex5, Arrays.asList(jobVertex4, jobVertex3), null);
    }

    @Test
    void testCannotConnectWrongOrder() throws Exception {
        JobVertex jobVertex = new JobVertex("vertex1");
        JobVertex jobVertex2 = new JobVertex("vertex2");
        JobVertex jobVertex3 = new JobVertex("vertex3");
        JobVertex jobVertex4 = new JobVertex("vertex4");
        JobVertex jobVertex5 = new JobVertex("vertex5");
        jobVertex.setParallelism(5);
        jobVertex2.setParallelism(7);
        jobVertex3.setParallelism(2);
        jobVertex4.setParallelism(11);
        jobVertex5.setParallelism(4);
        jobVertex.setInvokableClass(AbstractInvokable.class);
        jobVertex2.setInvokableClass(AbstractInvokable.class);
        jobVertex3.setInvokableClass(AbstractInvokable.class);
        jobVertex4.setInvokableClass(AbstractInvokable.class);
        jobVertex5.setInvokableClass(AbstractInvokable.class);
        JobVertexConnectionUtils.connectNewDataSetAsInput(jobVertex2, jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        JobVertexConnectionUtils.connectNewDataSetAsInput(jobVertex4, jobVertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        JobVertexConnectionUtils.connectNewDataSetAsInput(jobVertex4, jobVertex3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        JobVertexConnectionUtils.connectNewDataSetAsInput(jobVertex5, jobVertex4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        JobVertexConnectionUtils.connectNewDataSetAsInput(jobVertex5, jobVertex3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        ArrayList arrayList = new ArrayList(Arrays.asList(jobVertex, jobVertex2, jobVertex3, jobVertex5, jobVertex4));
        ExecutionGraph createDefaultExecutionGraph = createDefaultExecutionGraph(arrayList);
        Assertions.assertThatThrownBy(() -> {
            createDefaultExecutionGraph.attachJobGraph(arrayList, JOB_MANAGER_JOB_METRIC_GROUP);
        }).isInstanceOf(JobException.class);
    }

    @Test
    void testSetupInputSplits() throws Exception {
        InputSplit[] inputSplitArr = new InputSplit[0];
        TestingInputSplitAssigner testingInputSplitAssigner = new TestingInputSplitAssigner();
        TestingInputSplitAssigner testingInputSplitAssigner2 = new TestingInputSplitAssigner();
        TestingInputSplitSource testingInputSplitSource = new TestingInputSplitSource(inputSplitArr, testingInputSplitAssigner);
        TestingInputSplitSource testingInputSplitSource2 = new TestingInputSplitSource(inputSplitArr, testingInputSplitAssigner2);
        JobVertex jobVertex = new JobVertex("vertex1");
        JobVertex jobVertex2 = new JobVertex("vertex2");
        JobVertex jobVertex3 = new JobVertex("vertex3");
        JobVertex jobVertex4 = new JobVertex("vertex4");
        JobVertex jobVertex5 = new JobVertex("vertex5");
        jobVertex.setParallelism(5);
        jobVertex2.setParallelism(7);
        jobVertex3.setParallelism(2);
        jobVertex4.setParallelism(11);
        jobVertex5.setParallelism(4);
        jobVertex.setInvokableClass(AbstractInvokable.class);
        jobVertex2.setInvokableClass(AbstractInvokable.class);
        jobVertex3.setInvokableClass(AbstractInvokable.class);
        jobVertex4.setInvokableClass(AbstractInvokable.class);
        jobVertex5.setInvokableClass(AbstractInvokable.class);
        JobVertexConnectionUtils.connectNewDataSetAsInput(jobVertex2, jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        JobVertexConnectionUtils.connectNewDataSetAsInput(jobVertex4, jobVertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        JobVertexConnectionUtils.connectNewDataSetAsInput(jobVertex4, jobVertex3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        JobVertexConnectionUtils.connectNewDataSetAsInput(jobVertex5, jobVertex4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        JobVertexConnectionUtils.connectNewDataSetAsInput(jobVertex5, jobVertex3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        jobVertex3.setInputSplitSource(testingInputSplitSource);
        jobVertex5.setInputSplitSource(testingInputSplitSource2);
        ArrayList arrayList = new ArrayList(Arrays.asList(jobVertex, jobVertex2, jobVertex3, jobVertex4, jobVertex5));
        ExecutionGraph createDefaultExecutionGraph = createDefaultExecutionGraph(arrayList);
        createDefaultExecutionGraph.attachJobGraph(arrayList, JOB_MANAGER_JOB_METRIC_GROUP);
        Assertions.assertThat(((ExecutionJobVertex) createDefaultExecutionGraph.getAllVertices().get(jobVertex3.getID())).getSplitAssigner()).isEqualTo(testingInputSplitAssigner);
        Assertions.assertThat(((ExecutionJobVertex) createDefaultExecutionGraph.getAllVertices().get(jobVertex5.getID())).getSplitAssigner()).isEqualTo(testingInputSplitAssigner2);
    }

    @Test
    void testMultiConsumersForOneIntermediateResult() throws Exception {
        JobVertex jobVertex = new JobVertex("vertex1");
        JobVertex jobVertex2 = new JobVertex("vertex2");
        JobVertex jobVertex3 = new JobVertex("vertex3");
        IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID();
        JobVertexConnectionUtils.connectNewDataSetAsInput(jobVertex2, jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING, intermediateDataSetID, false);
        JobVertexConnectionUtils.connectNewDataSetAsInput(jobVertex3, jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING, intermediateDataSetID, false);
        ArrayList arrayList = new ArrayList(Arrays.asList(jobVertex, jobVertex2, jobVertex3));
        ExecutionGraph createDefaultExecutionGraph = createDefaultExecutionGraph(arrayList);
        createDefaultExecutionGraph.attachJobGraph(arrayList, JOB_MANAGER_JOB_METRIC_GROUP);
        ExecutionJobVertex executionJobVertex = (ExecutionJobVertex) Preconditions.checkNotNull(createDefaultExecutionGraph.getJobVertex(jobVertex.getID()));
        Assertions.assertThat(executionJobVertex.getProducedDataSets()).hasSize(1);
        Assertions.assertThat(executionJobVertex.getProducedDataSets()[0].getId()).isEqualTo(intermediateDataSetID);
        ExecutionJobVertex executionJobVertex2 = (ExecutionJobVertex) Preconditions.checkNotNull(createDefaultExecutionGraph.getJobVertex(jobVertex2.getID()));
        Assertions.assertThat(executionJobVertex2.getInputs()).hasSize(1);
        Assertions.assertThat(((IntermediateResult) executionJobVertex2.getInputs().get(0)).getId()).isEqualTo(intermediateDataSetID);
        ExecutionJobVertex executionJobVertex3 = (ExecutionJobVertex) Preconditions.checkNotNull(createDefaultExecutionGraph.getJobVertex(jobVertex3.getID()));
        Assertions.assertThat(executionJobVertex3.getInputs()).hasSize(1);
        Assertions.assertThat(((IntermediateResult) executionJobVertex3.getInputs().get(0)).getId()).isEqualTo(intermediateDataSetID);
        List allConsumedPartitionGroups = executionJobVertex2.getTaskVertices()[0].getAllConsumedPartitionGroups();
        Assertions.assertThat(allConsumedPartitionGroups).hasSize(1);
        Assertions.assertThat(((ConsumedPartitionGroup) allConsumedPartitionGroups.get(0)).getIntermediateDataSetID()).isEqualTo(intermediateDataSetID);
        List allConsumedPartitionGroups2 = executionJobVertex3.getTaskVertices()[0].getAllConsumedPartitionGroups();
        Assertions.assertThat(allConsumedPartitionGroups2).hasSize(1);
        Assertions.assertThat(((ConsumedPartitionGroup) allConsumedPartitionGroups2.get(0)).getIntermediateDataSetID()).isEqualTo(intermediateDataSetID);
    }

    @Test
    void testRegisterConsumedPartitionGroupToEdgeManager() throws Exception {
        JobVertex jobVertex = new JobVertex("source");
        JobVertex jobVertex2 = new JobVertex("sink");
        jobVertex.setParallelism(2);
        jobVertex2.setParallelism(2);
        JobVertexConnectionUtils.connectNewDataSetAsInput(jobVertex2, jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        ArrayList arrayList = new ArrayList(Arrays.asList(jobVertex, jobVertex2));
        ExecutionGraph createDefaultExecutionGraph = createDefaultExecutionGraph(arrayList);
        createDefaultExecutionGraph.attachJobGraph(arrayList, JOB_MANAGER_JOB_METRIC_GROUP);
        IntermediateResult intermediateResult = ((ExecutionJobVertex) Objects.requireNonNull(createDefaultExecutionGraph.getJobVertex(jobVertex.getID()))).getProducedDataSets()[0];
        IntermediateResultPartition intermediateResultPartition = intermediateResult.getPartitions()[0];
        IntermediateResultPartition intermediateResultPartition2 = intermediateResult.getPartitions()[1];
        Assertions.assertThat((Iterable) intermediateResultPartition2.getConsumedPartitionGroups().get(0)).isEqualTo(intermediateResultPartition.getConsumedPartitionGroups().get(0));
        ConsumedPartitionGroup consumedPartitionGroup = (ConsumedPartitionGroup) intermediateResultPartition.getConsumedPartitionGroups().get(0);
        HashSet hashSet = new HashSet();
        Iterator it = consumedPartitionGroup.iterator();
        while (it.hasNext()) {
            hashSet.add((IntermediateResultPartitionID) it.next());
        }
        Assertions.assertThat(hashSet).containsExactlyInAnyOrder(new IntermediateResultPartitionID[]{intermediateResultPartition.getPartitionId(), intermediateResultPartition2.getPartitionId()});
    }

    @Test
    void testPointWiseConsumedPartitionGroupPartitionFinished() throws Exception {
        JobVertex jobVertex = new JobVertex("source");
        JobVertex jobVertex2 = new JobVertex("sink");
        jobVertex.setParallelism(4);
        jobVertex2.setParallelism(2);
        JobVertexConnectionUtils.connectNewDataSetAsInput(jobVertex2, jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        ArrayList arrayList = new ArrayList(Arrays.asList(jobVertex, jobVertex2));
        ExecutionGraph createDefaultExecutionGraph = createDefaultExecutionGraph(arrayList);
        createDefaultExecutionGraph.attachJobGraph(arrayList, JOB_MANAGER_JOB_METRIC_GROUP);
        IntermediateResult intermediateResult = ((ExecutionJobVertex) Objects.requireNonNull(createDefaultExecutionGraph.getJobVertex(jobVertex.getID()))).getProducedDataSets()[0];
        IntermediateResultPartition intermediateResultPartition = intermediateResult.getPartitions()[0];
        IntermediateResultPartition intermediateResultPartition2 = intermediateResult.getPartitions()[1];
        IntermediateResultPartition intermediateResultPartition3 = intermediateResult.getPartitions()[2];
        IntermediateResultPartition intermediateResultPartition4 = intermediateResult.getPartitions()[3];
        ConsumedPartitionGroup consumedPartitionGroup = (ConsumedPartitionGroup) intermediateResultPartition.getConsumedPartitionGroups().get(0);
        ConsumedPartitionGroup consumedPartitionGroup2 = (ConsumedPartitionGroup) intermediateResultPartition4.getConsumedPartitionGroups().get(0);
        Assertions.assertThat(consumedPartitionGroup.getNumberOfUnfinishedPartitions()).isEqualTo(2);
        Assertions.assertThat(consumedPartitionGroup2.getNumberOfUnfinishedPartitions()).isEqualTo(2);
        intermediateResultPartition.markFinished();
        intermediateResultPartition2.markFinished();
        Assertions.assertThat(consumedPartitionGroup.getNumberOfUnfinishedPartitions()).isZero();
        intermediateResultPartition3.markFinished();
        intermediateResultPartition4.markFinished();
        Assertions.assertThat(consumedPartitionGroup2.getNumberOfUnfinishedPartitions()).isZero();
    }

    @Test
    void testAllToAllConsumedPartitionGroupPartitionFinished() throws Exception {
        JobVertex jobVertex = new JobVertex("source");
        JobVertex jobVertex2 = new JobVertex("sink");
        jobVertex.setParallelism(2);
        jobVertex2.setParallelism(2);
        JobVertexConnectionUtils.connectNewDataSetAsInput(jobVertex2, jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        ArrayList arrayList = new ArrayList(Arrays.asList(jobVertex, jobVertex2));
        ExecutionGraph createDefaultExecutionGraph = createDefaultExecutionGraph(arrayList);
        createDefaultExecutionGraph.attachJobGraph(arrayList, JOB_MANAGER_JOB_METRIC_GROUP);
        IntermediateResult intermediateResult = ((ExecutionJobVertex) Objects.requireNonNull(createDefaultExecutionGraph.getJobVertex(jobVertex.getID()))).getProducedDataSets()[0];
        IntermediateResultPartition intermediateResultPartition = intermediateResult.getPartitions()[0];
        IntermediateResultPartition intermediateResultPartition2 = intermediateResult.getPartitions()[1];
        ConsumedPartitionGroup consumedPartitionGroup = (ConsumedPartitionGroup) intermediateResultPartition.getConsumedPartitionGroups().get(0);
        Assertions.assertThat(consumedPartitionGroup.getNumberOfUnfinishedPartitions()).isEqualTo(2);
        intermediateResultPartition.markFinished();
        Assertions.assertThat(consumedPartitionGroup.getNumberOfUnfinishedPartitions()).isOne();
        intermediateResultPartition2.markFinished();
        Assertions.assertThat(consumedPartitionGroup.getNumberOfUnfinishedPartitions()).isZero();
    }

    @Test
    void testDynamicGraphAllToAllConsumedPartitionGroupPartitionFinished() throws Exception {
        JobVertex jobVertex = new JobVertex("source");
        JobVertex jobVertex2 = new JobVertex("sink");
        jobVertex.setParallelism(2);
        jobVertex2.setParallelism(2);
        JobVertexConnectionUtils.connectNewDataSetAsInput(jobVertex2, jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        ArrayList arrayList = new ArrayList(Arrays.asList(jobVertex, jobVertex2));
        ExecutionGraph createDynamicExecutionGraph = createDynamicExecutionGraph(arrayList);
        createDynamicExecutionGraph.attachJobGraph(arrayList, JOB_MANAGER_JOB_METRIC_GROUP);
        createDynamicExecutionGraph.initializeJobVertex(createDynamicExecutionGraph.getJobVertex(jobVertex.getID()), 0L);
        IntermediateResult intermediateResult = ((ExecutionJobVertex) Objects.requireNonNull(createDynamicExecutionGraph.getJobVertex(jobVertex.getID()))).getProducedDataSets()[0];
        IntermediateResultPartition intermediateResultPartition = intermediateResult.getPartitions()[0];
        IntermediateResultPartition intermediateResultPartition2 = intermediateResult.getPartitions()[1];
        intermediateResultPartition.markFinished();
        intermediateResultPartition2.markFinished();
        Assertions.assertThat(intermediateResultPartition.getConsumedPartitionGroups()).isEmpty();
        createDynamicExecutionGraph.initializeJobVertex(createDynamicExecutionGraph.getJobVertex(jobVertex2.getID()), 0L);
        Assertions.assertThat(((ConsumedPartitionGroup) intermediateResultPartition.getConsumedPartitionGroups().get(0)).getNumberOfUnfinishedPartitions()).isZero();
    }

    @Test
    void testDynamicGraphPointWiseConsumedPartitionGroupPartitionFinished() throws Exception {
        JobVertex jobVertex = new JobVertex("source");
        JobVertex jobVertex2 = new JobVertex("sink");
        jobVertex.setParallelism(4);
        jobVertex2.setParallelism(2);
        JobVertexConnectionUtils.connectNewDataSetAsInput(jobVertex2, jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        ArrayList arrayList = new ArrayList(Arrays.asList(jobVertex, jobVertex2));
        ExecutionGraph createDynamicExecutionGraph = createDynamicExecutionGraph(arrayList);
        createDynamicExecutionGraph.attachJobGraph(arrayList, JOB_MANAGER_JOB_METRIC_GROUP);
        createDynamicExecutionGraph.initializeJobVertex(createDynamicExecutionGraph.getJobVertex(jobVertex.getID()), 0L);
        IntermediateResult intermediateResult = ((ExecutionJobVertex) Objects.requireNonNull(createDynamicExecutionGraph.getJobVertex(jobVertex.getID()))).getProducedDataSets()[0];
        IntermediateResultPartition intermediateResultPartition = intermediateResult.getPartitions()[0];
        IntermediateResultPartition intermediateResultPartition2 = intermediateResult.getPartitions()[1];
        IntermediateResultPartition intermediateResultPartition3 = intermediateResult.getPartitions()[2];
        IntermediateResultPartition intermediateResultPartition4 = intermediateResult.getPartitions()[3];
        intermediateResultPartition.markFinished();
        intermediateResultPartition2.markFinished();
        intermediateResultPartition3.markFinished();
        intermediateResultPartition4.markFinished();
        Assertions.assertThat(intermediateResultPartition.getConsumedPartitionGroups()).isEmpty();
        Assertions.assertThat(intermediateResultPartition4.getConsumedPartitionGroups()).isEmpty();
        createDynamicExecutionGraph.initializeJobVertex(createDynamicExecutionGraph.getJobVertex(jobVertex2.getID()), 0L);
        Assertions.assertThat(((ConsumedPartitionGroup) intermediateResultPartition.getConsumedPartitionGroups().get(0)).getNumberOfUnfinishedPartitions()).isZero();
        Assertions.assertThat(((ConsumedPartitionGroup) intermediateResultPartition4.getConsumedPartitionGroups().get(0)).getNumberOfUnfinishedPartitions()).isZero();
    }

    @Test
    void testAttachToDynamicGraph() throws Exception {
        JobVertex jobVertex = new JobVertex("source");
        JobVertex jobVertex2 = new JobVertex("sink");
        jobVertex.setParallelism(2);
        jobVertex2.setParallelism(2);
        JobVertexConnectionUtils.connectNewDataSetAsInput(jobVertex2, jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        ArrayList arrayList = new ArrayList(Arrays.asList(jobVertex, jobVertex2));
        ExecutionGraph createDynamicExecutionGraph = createDynamicExecutionGraph(arrayList);
        createDynamicExecutionGraph.attachJobGraph(arrayList, JOB_MANAGER_JOB_METRIC_GROUP);
        Assertions.assertThat(createDynamicExecutionGraph.getAllVertices()).hasSize(2);
        Iterator it = createDynamicExecutionGraph.getVerticesTopologically().iterator();
        Assertions.assertThat(((ExecutionJobVertex) it.next()).isInitialized()).isFalse();
        Assertions.assertThat(((ExecutionJobVertex) it.next()).isInitialized()).isFalse();
    }
}
