package org.apache.flink.runtime.scheduler.adapter;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.ResultPartitionState;
import org.apache.flink.shaded.guava30.com.google.common.collect.Sets;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.IterableUtils;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.IteratorAssert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.class */
class DefaultExecutionTopologyTest {

    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();
    private DefaultExecutionGraph executionGraph;
    private DefaultExecutionTopology adapter;

    DefaultExecutionTopologyTest() {
    }

    @BeforeEach
    void setUp() throws Exception {
        JobVertex[] jobVertexArr = {ExecutionGraphTestUtils.createNoOpVertex(3), ExecutionGraphTestUtils.createNoOpVertex(3)};
        jobVertexArr[1].connectNewDataSetAsInput(jobVertexArr[0], DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        this.executionGraph = ExecutionGraphTestUtils.createExecutionGraph((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor(), jobVertexArr);
        this.adapter = DefaultExecutionTopology.fromExecutionGraph(this.executionGraph);
    }

    @Test
    void testConstructor() {
        assertGraphEquals(this.executionGraph, this.adapter);
    }

    @Test
    void testGetResultPartition() {
        Iterator it = this.executionGraph.getAllExecutionVertices().iterator();
        while (it.hasNext()) {
            for (Map.Entry entry : ((ExecutionVertex) it.next()).getProducedPartitions().entrySet()) {
                assertPartitionEquals((IntermediateResultPartition) entry.getValue(), this.adapter.getResultPartition((IntermediateResultPartitionID) entry.getKey()));
            }
        }
    }

    @Test
    void testResultPartitionStateSupplier() {
        IntermediateResultPartition intermediateResultPartition = (IntermediateResultPartition) IterableUtils.toStream(this.executionGraph.getAllExecutionVertices()).flatMap(executionVertex -> {
            return executionVertex.getProducedPartitions().values().stream();
        }).findAny().get();
        DefaultResultPartition resultPartition = this.adapter.getResultPartition(intermediateResultPartition.getPartitionId());
        Assertions.assertThat(resultPartition.getState()).isEqualTo(ResultPartitionState.CREATED);
        intermediateResultPartition.markDataProduced();
        Assertions.assertThat(resultPartition.getState()).isEqualTo(ResultPartitionState.CONSUMABLE);
    }

    @Test
    void testGetVertexOrThrow() {
        Assertions.assertThatThrownBy(() -> {
            this.adapter.getVertex(new ExecutionVertexID(new JobVertexID(), 0));
        }).isInstanceOf(IllegalArgumentException.class);
    }

    @Test
    void testResultPartitionOrThrow() {
        Assertions.assertThatThrownBy(() -> {
            this.adapter.getResultPartition(new IntermediateResultPartitionID());
        }).isInstanceOf(IllegalArgumentException.class);
    }

    @Test
    void testGetAllPipelinedRegions() {
        Assertions.assertThat(this.adapter.getAllPipelinedRegions()).hasSize(1);
    }

    @Test
    void testGetPipelinedRegionOfVertex() {
        Iterator it = this.adapter.getVertices().iterator();
        while (it.hasNext()) {
            assertRegionContainsAllVertices(this.adapter.getPipelinedRegionOfVertex(((DefaultExecutionVertex) it.next()).getId()));
        }
    }

    @Test
    void testErrorIfCoLocatedTasksAreNotInSameRegion() throws Exception {
        JobVertex createNoOpVertex = ExecutionGraphTestUtils.createNoOpVertex(3);
        JobVertex createNoOpVertex2 = ExecutionGraphTestUtils.createNoOpVertex(3);
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        createNoOpVertex.setSlotSharingGroup(slotSharingGroup);
        createNoOpVertex2.setSlotSharingGroup(slotSharingGroup);
        createNoOpVertex.setStrictlyCoLocatedWith(createNoOpVertex2);
        Assertions.assertThatThrownBy(() -> {
            ExecutionGraphTestUtils.createExecutionGraph((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor(), createNoOpVertex, createNoOpVertex2);
        }).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testUpdateTopology() throws Exception {
        JobVertex[] createJobVertices = createJobVertices(ResultPartitionType.BLOCKING);
        this.executionGraph = createDynamicGraph(createJobVertices);
        this.adapter = DefaultExecutionTopology.fromExecutionGraph(this.executionGraph);
        ExecutionJobVertex jobVertex = this.executionGraph.getJobVertex(createJobVertices[0].getID());
        ExecutionJobVertex jobVertex2 = this.executionGraph.getJobVertex(createJobVertices[1].getID());
        this.executionGraph.initializeJobVertex(jobVertex, 0L);
        this.adapter.notifyExecutionGraphUpdated(this.executionGraph, Collections.singletonList(jobVertex));
        Assertions.assertThat(this.adapter.getVertices()).hasSize(3);
        this.executionGraph.initializeJobVertex(jobVertex2, 0L);
        this.adapter.notifyExecutionGraphUpdated(this.executionGraph, Collections.singletonList(jobVertex2));
        Assertions.assertThat(this.adapter.getVertices()).hasSize(6);
        assertGraphEquals(this.executionGraph, this.adapter);
    }

    @Test
    void testErrorIfUpdateTopologyWithNewVertexPipelinedConnectedToOldOnes() throws Exception {
        JobVertex[] createJobVertices = createJobVertices(ResultPartitionType.PIPELINED);
        this.executionGraph = createDynamicGraph(createJobVertices);
        this.adapter = DefaultExecutionTopology.fromExecutionGraph(this.executionGraph);
        ExecutionJobVertex jobVertex = this.executionGraph.getJobVertex(createJobVertices[0].getID());
        ExecutionJobVertex jobVertex2 = this.executionGraph.getJobVertex(createJobVertices[1].getID());
        this.executionGraph.initializeJobVertex(jobVertex, 0L);
        this.adapter.notifyExecutionGraphUpdated(this.executionGraph, Collections.singletonList(jobVertex));
        this.executionGraph.initializeJobVertex(jobVertex2, 0L);
        Assertions.assertThatThrownBy(() -> {
            this.adapter.notifyExecutionGraphUpdated(this.executionGraph, Collections.singletonList(jobVertex2));
        }).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testExistingRegionsAreNotAffectedDuringTopologyUpdate() throws Exception {
        JobVertex[] createJobVertices = createJobVertices(ResultPartitionType.BLOCKING);
        this.executionGraph = createDynamicGraph(createJobVertices);
        this.adapter = DefaultExecutionTopology.fromExecutionGraph(this.executionGraph);
        ExecutionJobVertex jobVertex = this.executionGraph.getJobVertex(createJobVertices[0].getID());
        ExecutionJobVertex jobVertex2 = this.executionGraph.getJobVertex(createJobVertices[1].getID());
        this.executionGraph.initializeJobVertex(jobVertex, 0L);
        this.adapter.notifyExecutionGraphUpdated(this.executionGraph, Collections.singletonList(jobVertex));
        DefaultSchedulingPipelinedRegion pipelinedRegionOfVertex = this.adapter.getPipelinedRegionOfVertex(new ExecutionVertexID(jobVertex.getJobVertexId(), 0));
        this.executionGraph.initializeJobVertex(jobVertex2, 0L);
        this.adapter.notifyExecutionGraphUpdated(this.executionGraph, Collections.singletonList(jobVertex2));
        Assertions.assertThat(this.adapter.getPipelinedRegionOfVertex(new ExecutionVertexID(jobVertex.getJobVertexId(), 0))).isSameAs(pipelinedRegionOfVertex);
    }

    private JobVertex[] createJobVertices(ResultPartitionType resultPartitionType) {
        JobVertex[] jobVertexArr = {ExecutionGraphTestUtils.createNoOpVertex(3), ExecutionGraphTestUtils.createNoOpVertex(3)};
        jobVertexArr[1].connectNewDataSetAsInput(jobVertexArr[0], DistributionPattern.ALL_TO_ALL, resultPartitionType);
        return jobVertexArr;
    }

    private DefaultExecutionGraph createDynamicGraph(JobVertex... jobVertexArr) throws Exception {
        return TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(new JobGraph(new JobID(), "TestJob", jobVertexArr)).buildDynamicGraph((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
    }

    private void assertRegionContainsAllVertices(DefaultSchedulingPipelinedRegion defaultSchedulingPipelinedRegion) {
        Assertions.assertThat(Sets.newHashSet(defaultSchedulingPipelinedRegion.getVertices())).isEqualTo(Sets.newHashSet(this.adapter.getVertices()));
    }

    private static void assertGraphEquals(ExecutionGraph executionGraph, DefaultExecutionTopology defaultExecutionTopology) {
        Iterator it = defaultExecutionTopology.getVertices().iterator();
        for (ExecutionVertex executionVertex : executionGraph.getAllExecutionVertices()) {
            DefaultExecutionVertex defaultExecutionVertex = (DefaultExecutionVertex) it.next();
            Assertions.assertThat(defaultExecutionVertex.getId()).isEqualTo(executionVertex.getID());
            ArrayList arrayList = new ArrayList();
            Iterator it2 = executionVertex.getAllConsumedPartitionGroups().iterator();
            while (it2.hasNext()) {
                Iterator it3 = ((ConsumedPartitionGroup) it2.next()).iterator();
                while (it3.hasNext()) {
                    arrayList.add(executionVertex.getExecutionGraphAccessor().getResultPartitionOrThrow((IntermediateResultPartitionID) it3.next()));
                }
            }
            assertPartitionsEquals(arrayList, defaultExecutionVertex.getConsumedResults());
            assertPartitionsEquals(executionVertex.getProducedPartitions().values(), defaultExecutionVertex.getProducedResults());
        }
        ((IteratorAssert) Assertions.assertThat(it).as("Number of adapted vertices exceeds number of original vertices.", new Object[0])).isExhausted();
    }

    private static void assertPartitionsEquals(Iterable<IntermediateResultPartition> iterable, Iterable<DefaultResultPartition> iterable2) {
        Assertions.assertThat(iterable).hasSameSizeAs(iterable2);
        for (IntermediateResultPartition intermediateResultPartition : iterable) {
            DefaultResultPartition defaultResultPartition = (DefaultResultPartition) IterableUtils.toStream(iterable2).filter(defaultResultPartition2 -> {
                return defaultResultPartition2.getId().equals(intermediateResultPartition.getPartitionId());
            }).findAny().orElseThrow(() -> {
                return new AssertionError("Could not find matching adapted partition for " + intermediateResultPartition);
            });
            assertPartitionEquals(intermediateResultPartition, defaultResultPartition);
            ArrayList arrayList = new ArrayList();
            Iterator it = intermediateResultPartition.getConsumerVertexGroups().iterator();
            while (it.hasNext()) {
                Iterator it2 = ((ConsumerVertexGroup) it.next()).iterator();
                while (it2.hasNext()) {
                    arrayList.add((ExecutionVertexID) it2.next());
                }
            }
            List consumerVertexGroups = defaultResultPartition.getConsumerVertexGroups();
            Assertions.assertThat(consumerVertexGroups).isNotEmpty();
            Iterator it3 = arrayList.iterator();
            while (it3.hasNext()) {
                Assertions.assertThat(consumerVertexGroups.stream().flatMap((v0) -> {
                    return IterableUtils.toStream(v0);
                })).contains(new ExecutionVertexID[]{(ExecutionVertexID) it3.next()});
            }
        }
    }

    private static void assertPartitionEquals(IntermediateResultPartition intermediateResultPartition, DefaultResultPartition defaultResultPartition) {
        Assertions.assertThat(defaultResultPartition.getId()).isEqualTo(intermediateResultPartition.getPartitionId());
        Assertions.assertThat(defaultResultPartition.getResultId()).isEqualTo(intermediateResultPartition.getIntermediateResult().getId());
        Assertions.assertThat(defaultResultPartition.getResultType()).isEqualTo(intermediateResultPartition.getResultType());
        Assertions.assertThat(defaultResultPartition.getProducer().getId()).isEqualTo(intermediateResultPartition.getProducer().getID());
    }
}
