package org.apache.flink.runtime.scheduler.adaptive.allocator;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan;
import org.apache.flink.runtime.scheduler.adaptive.allocator.JobAllocationsInformation;
import org.apache.flink.runtime.scheduler.adaptive.allocator.JobInformation;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssignerTest.class */
class StateLocalitySlotAssignerTest {
    StateLocalitySlotAssignerTest() {
    }

    @Test
    void testSlotsAreNotWasted() {
        JobInformation.VertexInformation createVertex = createVertex(2);
        AllocationID allocationID = new AllocationID();
        AllocationID allocationID2 = new AllocationID();
        assign(createVertex, Arrays.asList(allocationID, allocationID2), Arrays.asList(new JobAllocationsInformation.VertexAllocationInformation(allocationID, createVertex.getJobVertexID(), KeyGroupRange.of(0, 9)), new JobAllocationsInformation.VertexAllocationInformation(allocationID2, createVertex.getJobVertexID(), KeyGroupRange.of(10, 19))));
    }

    @Test
    void testUpScaling() {
        JobInformation.VertexInformation createVertex = createVertex(7);
        List<AllocationID> createAllocationIDS = createAllocationIDS(100);
        ArrayList arrayList = new ArrayList();
        Iterator<AllocationID> it = createAllocationIDS.iterator();
        for (int i = 0; i < 3; i++) {
            arrayList.add(new JobAllocationsInformation.VertexAllocationInformation(it.next(), createVertex.getJobVertexID(), KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(createVertex.getMaxParallelism(), 3, i)));
        }
        verifyAssignments(assign(createVertex, createAllocationIDS, arrayList), 7, (AllocationID[]) arrayList.stream().map((v0) -> {
            return v0.getAllocationID();
        }).toArray(i2 -> {
            return new AllocationID[i2];
        }));
    }

    @Test
    void testDownScaling() {
        JobInformation.VertexInformation createVertex = createVertex(1);
        List<AllocationID> createAllocationIDS = createAllocationIDS(100);
        Iterator<AllocationID> it = createAllocationIDS.iterator();
        AllocationID next = it.next();
        ArrayList arrayList = new ArrayList();
        int maxParallelism = createVertex.getMaxParallelism() / 2;
        arrayList.add(new JobAllocationsInformation.VertexAllocationInformation(next, createVertex.getJobVertexID(), KeyGroupRange.of(0, maxParallelism - 1)));
        for (int i = 1; i < 5; i++) {
            int i2 = maxParallelism + i;
            arrayList.add(new JobAllocationsInformation.VertexAllocationInformation(it.next(), createVertex.getJobVertexID(), KeyGroupRange.of(i2, i2)));
        }
        verifyAssignments(assign(createVertex, createAllocationIDS, arrayList), 1, next);
    }

    private static void verifyAssignments(Collection<JobSchedulingPlan.SlotAssignment> collection, int i, AllocationID... allocationIDArr) {
        MatcherAssert.assertThat(collection, Matchers.hasSize(i));
        MatcherAssert.assertThat(collection.stream().map(slotAssignment -> {
            return slotAssignment.getSlotInfo().getAllocationId();
        }).collect(Collectors.toSet()), Matchers.hasItems(allocationIDArr));
    }

    private static Collection<JobSchedulingPlan.SlotAssignment> assign(JobInformation.VertexInformation vertexInformation, List<AllocationID> list, List<JobAllocationsInformation.VertexAllocationInformation> list2) {
        return new StateLocalitySlotAssigner().assignSlots(new TestJobInformation(Collections.singletonList(vertexInformation)), (Collection) list.stream().map(TestSlotInfo::new).collect(Collectors.toList()), new VertexParallelism(Collections.singletonMap(vertexInformation.getJobVertexID(), Integer.valueOf(vertexInformation.getParallelism()))), new JobAllocationsInformation(Collections.singletonMap(vertexInformation.getJobVertexID(), list2)));
    }

    private static JobInformation.VertexInformation createVertex(int i) {
        JobVertexID jobVertexID = new JobVertexID();
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        slotSharingGroup.addVertexToGroup(jobVertexID);
        return new TestVertexInformation(jobVertexID, i, slotSharingGroup);
    }

    private static List<AllocationID> createAllocationIDS(int i) {
        return (List) IntStream.range(0, i).mapToObj(i2 -> {
            return new AllocationID();
        }).collect(Collectors.toList());
    }
}
