package org.apache.flink.runtime.scheduler;

import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.JobVertexConnectionUtils;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.IterableUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/ExecutionGraphToInputsLocationsRetrieverAdapterTest.class */
class ExecutionGraphToInputsLocationsRetrieverAdapterTest {

    @RegisterExtension
    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION = TestingUtils.defaultExecutorExtension();

    ExecutionGraphToInputsLocationsRetrieverAdapterTest() {
    }

    @Test
    void testGetConsumedPartitionGroupsAndProducers() throws Exception {
        JobVertex createNoOpVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
        JobVertex createNoOpVertex2 = ExecutionGraphTestUtils.createNoOpVertex(1);
        JobVertex createNoOpVertex3 = ExecutionGraphTestUtils.createNoOpVertex(1);
        IntermediateDataSet source = JobVertexConnectionUtils.connectNewDataSetAsInput(createNoOpVertex3, createNoOpVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED).getSource();
        IntermediateDataSet source2 = JobVertexConnectionUtils.connectNewDataSetAsInput(createNoOpVertex3, createNoOpVertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED).getSource();
        ExecutionGraphToInputsLocationsRetrieverAdapter executionGraphToInputsLocationsRetrieverAdapter = new ExecutionGraphToInputsLocationsRetrieverAdapter(ExecutionGraphTestUtils.createExecutionGraph((ScheduledExecutorService) EXECUTOR_EXTENSION.getExecutor(), createNoOpVertex, createNoOpVertex2, createNoOpVertex3));
        ExecutionVertexID executionVertexID = new ExecutionVertexID(createNoOpVertex.getID(), 0);
        ExecutionVertexID executionVertexID2 = new ExecutionVertexID(createNoOpVertex2.getID(), 0);
        ExecutionVertexID executionVertexID3 = new ExecutionVertexID(createNoOpVertex3.getID(), 0);
        Collection consumedPartitionGroups = executionGraphToInputsLocationsRetrieverAdapter.getConsumedPartitionGroups(executionVertexID);
        Collection consumedPartitionGroups2 = executionGraphToInputsLocationsRetrieverAdapter.getConsumedPartitionGroups(executionVertexID2);
        Collection<ConsumedPartitionGroup> consumedPartitionGroups3 = executionGraphToInputsLocationsRetrieverAdapter.getConsumedPartitionGroups(executionVertexID3);
        IntermediateResultPartitionID intermediateResultPartitionID = new IntermediateResultPartitionID(source.getId(), 0);
        IntermediateResultPartitionID intermediateResultPartitionID2 = new IntermediateResultPartitionID(source2.getId(), 0);
        Assertions.assertThat(consumedPartitionGroups).isEmpty();
        Assertions.assertThat(consumedPartitionGroups2).isEmpty();
        Assertions.assertThat(consumedPartitionGroups3).hasSize(2);
        Assertions.assertThat((Collection) consumedPartitionGroups3.stream().flatMap((v0) -> {
            return IterableUtils.toStream(v0);
        }).collect(Collectors.toSet())).containsExactlyInAnyOrder(new IntermediateResultPartitionID[]{intermediateResultPartitionID, intermediateResultPartitionID2});
        for (ConsumedPartitionGroup consumedPartitionGroup : consumedPartitionGroups3) {
            if (consumedPartitionGroup.getFirst().equals(intermediateResultPartitionID)) {
                Assertions.assertThat(executionGraphToInputsLocationsRetrieverAdapter.getProducersOfConsumedPartitionGroup(consumedPartitionGroup)).containsExactly(new ExecutionVertexID[]{executionVertexID});
            } else {
                Assertions.assertThat(executionGraphToInputsLocationsRetrieverAdapter.getProducersOfConsumedPartitionGroup(consumedPartitionGroup)).containsExactly(new ExecutionVertexID[]{executionVertexID2});
            }
        }
    }

    @Test
    void testGetEmptyTaskManagerLocationIfVertexNotScheduled() throws Exception {
        JobVertex createNoOpVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
        Assertions.assertThat(new ExecutionGraphToInputsLocationsRetrieverAdapter(ExecutionGraphTestUtils.createExecutionGraph((ScheduledExecutorService) EXECUTOR_EXTENSION.getExecutor(), createNoOpVertex)).getTaskManagerLocation(new ExecutionVertexID(createNoOpVertex.getID(), 0))).isNotPresent();
    }

    @Test
    void testGetTaskManagerLocationWhenScheduled() throws Exception {
        JobVertex createNoOpVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
        TestingLogicalSlot createTestingLogicalSlot = new TestingLogicalSlotBuilder().createTestingLogicalSlot();
        DefaultExecutionGraph createExecutionGraph = ExecutionGraphTestUtils.createExecutionGraph((ScheduledExecutorService) EXECUTOR_EXTENSION.getExecutor(), createNoOpVertex);
        ExecutionGraphToInputsLocationsRetrieverAdapter executionGraphToInputsLocationsRetrieverAdapter = new ExecutionGraphToInputsLocationsRetrieverAdapter(createExecutionGraph);
        ExecutionVertex executionVertex = (ExecutionVertex) createExecutionGraph.getAllExecutionVertices().iterator().next();
        executionVertex.getCurrentExecutionAttempt().transitionState(ExecutionState.SCHEDULED);
        executionVertex.deployToSlot(createTestingLogicalSlot);
        Optional taskManagerLocation = executionGraphToInputsLocationsRetrieverAdapter.getTaskManagerLocation(new ExecutionVertexID(createNoOpVertex.getID(), 0));
        Assertions.assertThat(taskManagerLocation).isPresent();
        Assertions.assertThat((TaskManagerLocation) ((CompletableFuture) taskManagerLocation.get()).get()).isEqualTo(createTestingLogicalSlot.getTaskManagerLocation());
    }

    @Test
    void testGetNonExistingExecutionVertexWillThrowException() throws Exception {
        ExecutionGraphToInputsLocationsRetrieverAdapter executionGraphToInputsLocationsRetrieverAdapter = new ExecutionGraphToInputsLocationsRetrieverAdapter(ExecutionGraphTestUtils.createExecutionGraph((ScheduledExecutorService) EXECUTOR_EXTENSION.getExecutor(), ExecutionGraphTestUtils.createNoOpVertex(1)));
        ExecutionVertexID executionVertexID = new ExecutionVertexID(new JobVertexID(), 0);
        Assertions.assertThatThrownBy(() -> {
            executionGraphToInputsLocationsRetrieverAdapter.getTaskManagerLocation(executionVertexID);
        }, "Should throw exception if execution vertex doesn't exist!", new Object[0]).isInstanceOf(IllegalStateException.class);
    }
}
