package org.apache.flink.runtime.scheduler;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupImpl;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.scheduler.AbstractSlotSharingStrategyTest;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingExecutionVertex;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.util.JobVertexConnectionUtils;
import org.apache.flink.shaded.guava32.com.google.common.collect.Lists;
import org.apache.flink.shaded.guava32.com.google.common.collect.Sets;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategyTest.class */
class LocalInputPreferredSlotSharingStrategyTest extends AbstractSlotSharingStrategyTest {

    @RegisterExtension
    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION = TestingUtils.defaultExecutorExtension();
    private final JobVertexID jobVertexId3 = new JobVertexID();
    private TestingSchedulingExecutionVertex ev11;
    private TestingSchedulingExecutionVertex ev12;
    private TestingSchedulingExecutionVertex ev21;
    private TestingSchedulingExecutionVertex ev22;
    private TestingSchedulingExecutionVertex ev23;

    LocalInputPreferredSlotSharingStrategyTest() {
    }

    @Override // org.apache.flink.runtime.scheduler.AbstractSlotSharingStrategyTest
    protected SlotSharingStrategy getSlotSharingStrategy(SchedulingTopology schedulingTopology, Set<SlotSharingGroup> set, Set<CoLocationGroup> set2) {
        return new LocalInputPreferredSlotSharingStrategy(schedulingTopology, set, set2);
    }

    @Test
    void testInputLocalityIsRespectedWithRescaleEdge() {
        createTwoExeVerticesPerJv1AndJv2(this.slotSharingGroup);
        this.ev23 = this.topology.newExecutionVertex(this.jobVertexId2, 2);
        this.topology.connect(this.ev11, this.ev21);
        this.topology.connect(this.ev11, this.ev22);
        this.topology.connect(this.ev12, this.ev23);
        SlotSharingStrategy slotSharingStrategy = getSlotSharingStrategy(this.topology, Sets.newHashSet(new SlotSharingGroup[]{this.slotSharingGroup}), Collections.emptySet());
        Assertions.assertThat(slotSharingStrategy.getExecutionSlotSharingGroups()).hasSize(3);
        Assertions.assertThat(slotSharingStrategy.getExecutionSlotSharingGroup(this.ev21.m563getId()).getExecutionVertexIds()).contains(new ExecutionVertexID[]{this.ev11.m563getId(), this.ev21.m563getId()});
        Assertions.assertThat(slotSharingStrategy.getExecutionSlotSharingGroup(this.ev22.m563getId()).getExecutionVertexIds()).contains(new ExecutionVertexID[]{this.ev22.m563getId()});
        Assertions.assertThat(slotSharingStrategy.getExecutionSlotSharingGroup(this.ev23.m563getId()).getExecutionVertexIds()).contains(new ExecutionVertexID[]{this.ev12.m563getId(), this.ev23.m563getId()});
    }

    private void createTwoExeVerticesPerJv1AndJv2(SlotSharingGroup slotSharingGroup) {
        this.ev11 = this.topology.newExecutionVertex(this.jobVertexId1, 0);
        this.ev12 = this.topology.newExecutionVertex(this.jobVertexId1, 1);
        this.ev21 = this.topology.newExecutionVertex(this.jobVertexId2, 0);
        this.ev22 = this.topology.newExecutionVertex(this.jobVertexId2, 1);
        slotSharingGroup.addVertexToGroup(this.jobVertexId1);
        slotSharingGroup.addVertexToGroup(this.jobVertexId2);
    }

    @Test
    void testInputLocalityIsRespectedWithAllToAllEdge() {
        this.slotSharingGroup.addVertexToGroup(this.jobVertexId1);
        this.slotSharingGroup.addVertexToGroup(this.jobVertexId2);
        List<TestingSchedulingExecutionVertex> finish = this.topology.addExecutionVertices().withParallelism(2).withJobVertexID(this.jobVertexId1).finish();
        List<TestingSchedulingExecutionVertex> finish2 = this.topology.addExecutionVertices().withParallelism(2).withJobVertexID(this.jobVertexId2).finish();
        this.topology.connectAllToAll(finish, finish2).withResultPartitionType(ResultPartitionType.BLOCKING).finish();
        this.ev11 = finish.get(0);
        this.ev12 = finish.get(1);
        this.ev21 = finish2.get(0);
        this.ev22 = finish2.get(1);
        SlotSharingStrategy slotSharingStrategy = getSlotSharingStrategy(this.topology, Sets.newHashSet(new SlotSharingGroup[]{this.slotSharingGroup}), Collections.emptySet());
        Assertions.assertThat(slotSharingStrategy.getExecutionSlotSharingGroups()).hasSize(2);
        Assertions.assertThat(slotSharingStrategy.getExecutionSlotSharingGroup(this.ev21.m563getId()).getExecutionVertexIds()).contains(new ExecutionVertexID[]{this.ev11.m563getId(), this.ev21.m563getId()});
        Assertions.assertThat(slotSharingStrategy.getExecutionSlotSharingGroup(this.ev22.m563getId()).getExecutionVertexIds()).contains(new ExecutionVertexID[]{this.ev12.m563getId(), this.ev22.m563getId()});
    }

    @Test
    void testCoLocationConstraintIsRespected() {
        ArrayList arrayList = new ArrayList();
        CoLocationGroup coLocationGroupImpl = new CoLocationGroupImpl(new JobVertex[0]);
        CoLocationGroup coLocationGroupImpl2 = new CoLocationGroupImpl(new JobVertex[0]);
        renderTopology(this.topology, Lists.newArrayList(new AbstractSlotSharingStrategyTest.TestingJobVertexInfo[]{new AbstractSlotSharingStrategyTest.TestingJobVertexInfo(1, this.slotSharingGroup, null), new AbstractSlotSharingStrategyTest.TestingJobVertexInfo(2, this.slotSharingGroup, coLocationGroupImpl), new AbstractSlotSharingStrategyTest.TestingJobVertexInfo(2, this.slotSharingGroup, coLocationGroupImpl), new AbstractSlotSharingStrategyTest.TestingJobVertexInfo(3, this.slotSharingGroup, coLocationGroupImpl2), new AbstractSlotSharingStrategyTest.TestingJobVertexInfo(3, this.slotSharingGroup, coLocationGroupImpl2)}), arrayList);
        SlotSharingStrategy slotSharingStrategy = getSlotSharingStrategy(this.topology, Sets.newHashSet(new SlotSharingGroup[]{this.slotSharingGroup}), Sets.newHashSet(new CoLocationGroup[]{coLocationGroupImpl, coLocationGroupImpl2}));
        List list = (List) arrayList.get(1).f1;
        List list2 = (List) arrayList.get(2).f1;
        Assertions.assertThat(list).hasSameSizeAs(list2);
        for (int i = 0; i < list.size(); i++) {
            Assertions.assertThat(slotSharingStrategy.getExecutionSlotSharingGroup(((TestingSchedulingExecutionVertex) list.get(i)).m563getId())).isEqualTo(slotSharingStrategy.getExecutionSlotSharingGroup(((TestingSchedulingExecutionVertex) list2.get(i)).m563getId()));
        }
        List list3 = (List) arrayList.get(3).f1;
        List list4 = (List) arrayList.get(4).f1;
        Assertions.assertThat(list3).hasSameSizeAs(list4);
        for (int i2 = 0; i2 < list3.size(); i2++) {
            Assertions.assertThat(slotSharingStrategy.getExecutionSlotSharingGroup(((TestingSchedulingExecutionVertex) list3.get(i2)).m563getId())).isEqualTo(slotSharingStrategy.getExecutionSlotSharingGroup(((TestingSchedulingExecutionVertex) list4.get(i2)).m563getId()));
        }
    }

    @Test
    void testDisjointVerticesInOneGroup() {
        createTwoExeVerticesPerJv1AndJv2(this.slotSharingGroup);
        SlotSharingStrategy slotSharingStrategy = getSlotSharingStrategy(this.topology, Sets.newHashSet(new SlotSharingGroup[]{this.slotSharingGroup}), Collections.emptySet());
        Assertions.assertThat(slotSharingStrategy.getExecutionSlotSharingGroups()).hasSize(2);
        Assertions.assertThat(slotSharingStrategy.getExecutionSlotSharingGroup(this.ev11.m563getId()).getExecutionVertexIds()).contains(new ExecutionVertexID[]{this.ev11.m563getId(), this.ev21.m563getId()});
        Assertions.assertThat(slotSharingStrategy.getExecutionSlotSharingGroup(this.ev12.m563getId()).getExecutionVertexIds()).contains(new ExecutionVertexID[]{this.ev12.m563getId(), this.ev22.m563getId()});
    }

    @Test
    void testVerticesInDifferentSlotSharingGroups() {
        this.ev11 = this.topology.newExecutionVertex(this.jobVertexId1, 0);
        this.ev12 = this.topology.newExecutionVertex(this.jobVertexId1, 1);
        this.ev21 = this.topology.newExecutionVertex(this.jobVertexId2, 0);
        this.ev22 = this.topology.newExecutionVertex(this.jobVertexId2, 1);
        this.slotSharingGroup1.addVertexToGroup(this.jobVertexId1);
        this.slotSharingGroup2.addVertexToGroup(this.jobVertexId2);
        SlotSharingStrategy slotSharingStrategy = getSlotSharingStrategy(this.topology, Sets.newHashSet(new SlotSharingGroup[]{this.slotSharingGroup1, this.slotSharingGroup2}), Collections.emptySet());
        Assertions.assertThat(slotSharingStrategy.getExecutionSlotSharingGroups()).hasSize(4);
        Assertions.assertThat(slotSharingStrategy.getExecutionSlotSharingGroup(this.ev11.m563getId()).getExecutionVertexIds()).contains(new ExecutionVertexID[]{this.ev11.m563getId()});
        Assertions.assertThat(slotSharingStrategy.getExecutionSlotSharingGroup(this.ev12.m563getId()).getExecutionVertexIds()).contains(new ExecutionVertexID[]{this.ev12.m563getId()});
        Assertions.assertThat(slotSharingStrategy.getExecutionSlotSharingGroup(this.ev21.m563getId()).getExecutionVertexIds()).contains(new ExecutionVertexID[]{this.ev21.m563getId()});
        Assertions.assertThat(slotSharingStrategy.getExecutionSlotSharingGroup(this.ev22.m563getId()).getExecutionVertexIds()).contains(new ExecutionVertexID[]{this.ev22.m563getId()});
    }

    @Test
    void testInputLocalityIsRespectedWithTwoEdgesBetweenTwoVertices() throws Exception {
        createTwoExeVerticesPerJv1AndJv2(this.slotSharingGroup);
        JobVertex createJobVertex = createJobVertex("v1", this.jobVertexId1, 4);
        JobVertex createJobVertex2 = createJobVertex("v2", this.jobVertexId2, 4);
        JobVertexConnectionUtils.connectNewDataSetAsInput(createJobVertex2, createJobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        JobVertexConnectionUtils.connectNewDataSetAsInput(createJobVertex2, createJobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        Assertions.assertThat(createJobVertex.getProducedDataSets()).hasSize(2);
        Assertions.assertThat(createJobVertex2.getInputs()).hasSize(2);
        DefaultExecutionGraph build = TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(JobGraphTestUtils.batchJobGraph(createJobVertex, createJobVertex2)).build((ScheduledExecutorService) EXECUTOR_EXTENSION.getExecutor());
        SlotSharingStrategy slotSharingStrategy = getSlotSharingStrategy(build.getSchedulingTopology(), Sets.newHashSet(new SlotSharingGroup[]{this.slotSharingGroup}), Collections.emptySet());
        Assertions.assertThat(slotSharingStrategy.getExecutionSlotSharingGroups()).hasSize(4);
        ExecutionVertex[] taskVertices = ((ExecutionJobVertex) Objects.requireNonNull(build.getJobVertex(this.jobVertexId1))).getTaskVertices();
        ExecutionVertex[] taskVertices2 = ((ExecutionJobVertex) Objects.requireNonNull(build.getJobVertex(this.jobVertexId2))).getTaskVertices();
        for (int i = 0; i < 4; i++) {
            Assertions.assertThat(slotSharingStrategy.getExecutionSlotSharingGroup(taskVertices[i].getID()).getExecutionVertexIds()).contains(new ExecutionVertexID[]{taskVertices[i].getID(), taskVertices2[i].getID()});
        }
    }

    @Test
    void testGetExecutionSlotSharingGroupOfLateAttachedVertices() {
        this.slotSharingGroup1.addVertexToGroup(this.jobVertexId1);
        this.slotSharingGroup1.addVertexToGroup(this.jobVertexId2);
        this.slotSharingGroup2.addVertexToGroup(this.jobVertexId3);
        TestingSchedulingExecutionVertex newExecutionVertex = this.topology.newExecutionVertex(this.jobVertexId1, 0);
        TestingSchedulingExecutionVertex newExecutionVertex2 = this.topology.newExecutionVertex(this.jobVertexId2, 0);
        this.topology.connect(newExecutionVertex, newExecutionVertex2);
        LocalInputPreferredSlotSharingStrategy slotSharingStrategy = getSlotSharingStrategy(this.topology, new HashSet(Arrays.asList(this.slotSharingGroup1, this.slotSharingGroup2)), Collections.emptySet());
        Assertions.assertThat(slotSharingStrategy.getExecutionSlotSharingGroups()).hasSize(1);
        Assertions.assertThat(slotSharingStrategy.getExecutionSlotSharingGroup(newExecutionVertex.m563getId()).getExecutionVertexIds()).contains(new ExecutionVertexID[]{newExecutionVertex.m563getId(), newExecutionVertex2.m563getId()});
        Assertions.assertThat(slotSharingStrategy.getExecutionSlotSharingGroup(newExecutionVertex2.m563getId()).getExecutionVertexIds()).contains(new ExecutionVertexID[]{newExecutionVertex.m563getId(), newExecutionVertex2.m563getId()});
        TestingSchedulingExecutionVertex newExecutionVertex3 = this.topology.newExecutionVertex(this.jobVertexId3, 0);
        this.topology.connect(newExecutionVertex2, newExecutionVertex3, ResultPartitionType.BLOCKING);
        slotSharingStrategy.notifySchedulingTopologyUpdated(this.topology, Collections.singletonList(newExecutionVertex3.m563getId()));
        Assertions.assertThat(slotSharingStrategy.getExecutionSlotSharingGroups()).hasSize(2);
        Assertions.assertThat(slotSharingStrategy.getExecutionSlotSharingGroup(newExecutionVertex.m563getId()).getExecutionVertexIds()).contains(new ExecutionVertexID[]{newExecutionVertex.m563getId(), newExecutionVertex2.m563getId()});
        Assertions.assertThat(slotSharingStrategy.getExecutionSlotSharingGroup(newExecutionVertex2.m563getId()).getExecutionVertexIds()).contains(new ExecutionVertexID[]{newExecutionVertex.m563getId(), newExecutionVertex2.m563getId()});
        Assertions.assertThat(slotSharingStrategy.getExecutionSlotSharingGroup(newExecutionVertex3.m563getId()).getExecutionVertexIds()).contains(new ExecutionVertexID[]{newExecutionVertex3.m563getId()});
    }

    private static JobVertex createJobVertex(String str, JobVertexID jobVertexID, int i) {
        JobVertex jobVertex = new JobVertex(str, jobVertexID);
        jobVertex.setParallelism(i);
        jobVertex.setInvokableClass(NoOpInvokable.class);
        return jobVertex;
    }
}
