package org.apache.flink.runtime.scheduler;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.scheduler.SlotSharingStrategy;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.class */
public class TaskBalancedPreferredSlotSharingStrategy extends AbstractSlotSharingStrategy {
    public static final Logger LOG = LoggerFactory.getLogger(TaskBalancedPreferredSlotSharingStrategy.class);

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy$Factory.class */
    static class Factory implements SlotSharingStrategy.Factory {
        Factory() {
        }

        @Override // org.apache.flink.runtime.scheduler.SlotSharingStrategy.Factory
        public TaskBalancedPreferredSlotSharingStrategy create(SchedulingTopology schedulingTopology, Set<SlotSharingGroup> set, Set<CoLocationGroup> set2) {
            return new TaskBalancedPreferredSlotSharingStrategy(schedulingTopology, set, set2);
        }

        @Override // org.apache.flink.runtime.scheduler.SlotSharingStrategy.Factory
        public /* bridge */ /* synthetic */ SlotSharingStrategy create(SchedulingTopology schedulingTopology, Set set, Set set2) {
            return create(schedulingTopology, (Set<SlotSharingGroup>) set, (Set<CoLocationGroup>) set2);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy$TaskBalancedExecutionSlotSharingGroupBuilder.class */
    private static class TaskBalancedExecutionSlotSharingGroupBuilder {
        private final SchedulingTopology topology;
        private final Map<JobVertexID, SlotSharingGroup> slotSharingGroupMap;
        private final Map<SlotSharingGroup, List<ExecutionSlotSharingGroup>> paralleledExecutionSlotSharingGroupsMap;
        private final Map<SlotSharingGroup, Integer> slotSharingGroupIndexMap;
        private final Map<ExecutionVertexID, ExecutionSlotSharingGroup> executionSlotSharingGroupMap;
        private final Map<JobVertexID, CoLocationGroup> coLocationGroupMap = new HashMap();
        private final Map<CoLocationConstraint, ExecutionSlotSharingGroup> constraintToExecutionSlotSharingGroupMap;

        private TaskBalancedExecutionSlotSharingGroupBuilder(SchedulingTopology schedulingTopology, Set<SlotSharingGroup> set, Set<CoLocationGroup> set2) {
            this.topology = (SchedulingTopology) Preconditions.checkNotNull(schedulingTopology);
            for (CoLocationGroup coLocationGroup : set2) {
                Iterator<JobVertexID> it = coLocationGroup.getVertexIds().iterator();
                while (it.hasNext()) {
                    this.coLocationGroupMap.put(it.next(), coLocationGroup);
                }
            }
            this.constraintToExecutionSlotSharingGroupMap = new HashMap();
            this.paralleledExecutionSlotSharingGroupsMap = new HashMap(set.size());
            this.slotSharingGroupIndexMap = new HashMap(set.size());
            this.slotSharingGroupMap = new HashMap();
            this.executionSlotSharingGroupMap = new HashMap();
            for (SlotSharingGroup slotSharingGroup : set) {
                Iterator<JobVertexID> it2 = slotSharingGroup.getJobVertexIds().iterator();
                while (it2.hasNext()) {
                    this.slotSharingGroupMap.put(it2.next(), slotSharingGroup);
                }
            }
        }

        private Map<ExecutionVertexID, ExecutionSlotSharingGroup> build() {
            LinkedHashMap<JobVertexID, List<SchedulingExecutionVertex>> executionVertices = AbstractSlotSharingStrategy.getExecutionVertices(this.topology);
            initParalleledExecutionSlotSharingGroupsMap(executionVertices);
            for (Map.Entry<JobVertexID, List<SchedulingExecutionVertex>> entry : executionVertices.entrySet()) {
                JobVertexID key = entry.getKey();
                List<SchedulingExecutionVertex> value = entry.getValue();
                SlotSharingGroup slotSharingGroup = this.slotSharingGroupMap.get(key);
                if (this.coLocationGroupMap.containsKey(key)) {
                    allocateCoLocatedVertices(slotSharingGroup, value);
                } else {
                    allocateNonCoLocatedVertices(slotSharingGroup, value);
                }
            }
            return this.executionSlotSharingGroupMap;
        }

        private void initParalleledExecutionSlotSharingGroupsMap(LinkedHashMap<JobVertexID, List<SchedulingExecutionVertex>> linkedHashMap) {
            ((Map) linkedHashMap.entrySet().stream().map(entry -> {
                return Tuple2.of(this.slotSharingGroupMap.get(entry.getKey()), Integer.valueOf(((List) entry.getValue()).size()));
            }).collect(Collectors.groupingBy(tuple2 -> {
                return (SlotSharingGroup) tuple2.f0;
            }, Collectors.summarizingInt(tuple22 -> {
                return ((Integer) tuple22.f1).intValue();
            })))).forEach((slotSharingGroup, intSummaryStatistics) -> {
                this.paralleledExecutionSlotSharingGroupsMap.put(slotSharingGroup, createExecutionSlotSharingGroups(slotSharingGroup, intSummaryStatistics.getMax()));
            });
        }

        private List<ExecutionSlotSharingGroup> createExecutionSlotSharingGroups(SlotSharingGroup slotSharingGroup, int i) {
            ArrayList arrayList = new ArrayList(i);
            for (int i2 = 0; i2 < i; i2++) {
                ExecutionSlotSharingGroup executionSlotSharingGroup = new ExecutionSlotSharingGroup(slotSharingGroup);
                arrayList.add(i2, executionSlotSharingGroup);
                TaskBalancedPreferredSlotSharingStrategy.LOG.debug("Create {}th executionSlotSharingGroup {}.", Integer.valueOf(i2), executionSlotSharingGroup);
            }
            return arrayList;
        }

        private void allocateCoLocatedVertices(SlotSharingGroup slotSharingGroup, List<SchedulingExecutionVertex> list) {
            List<ExecutionSlotSharingGroup> list2 = this.paralleledExecutionSlotSharingGroupsMap.get(slotSharingGroup);
            for (SchedulingExecutionVertex schedulingExecutionVertex : list) {
                CoLocationConstraint coLocationConstraint = getCoLocationConstraint(schedulingExecutionVertex);
                ExecutionSlotSharingGroup executionSlotSharingGroup = this.constraintToExecutionSlotSharingGroupMap.get(coLocationConstraint);
                if (Objects.isNull(executionSlotSharingGroup)) {
                    executionSlotSharingGroup = list2.get(getLeastUtilizeSlotIndex(list2, schedulingExecutionVertex));
                    this.constraintToExecutionSlotSharingGroupMap.put(coLocationConstraint, executionSlotSharingGroup);
                }
                addVertexToExecutionSlotSharingGroup(executionSlotSharingGroup, schedulingExecutionVertex);
            }
            int size = list.size();
            if (isMaxParallelism(size, slotSharingGroup)) {
                return;
            }
            updateSlotRoundRobinIndexIfNeeded(size, slotSharingGroup, getLeastUtilizeSlotIndex(list2, null));
        }

        private void allocateNonCoLocatedVertices(SlotSharingGroup slotSharingGroup, List<SchedulingExecutionVertex> list) {
            int slotRoundRobinIndex = getSlotRoundRobinIndex(list.size(), slotSharingGroup);
            List<ExecutionSlotSharingGroup> list2 = this.paralleledExecutionSlotSharingGroupsMap.get(slotSharingGroup);
            Iterator<SchedulingExecutionVertex> it = list.iterator();
            while (it.hasNext()) {
                addVertexToExecutionSlotSharingGroup(list2.get(slotRoundRobinIndex), it.next());
                slotRoundRobinIndex = (slotRoundRobinIndex + 1) % list2.size();
            }
            updateSlotRoundRobinIndexIfNeeded(list.size(), slotSharingGroup, slotRoundRobinIndex);
        }

        private void addVertexToExecutionSlotSharingGroup(ExecutionSlotSharingGroup executionSlotSharingGroup, SchedulingExecutionVertex schedulingExecutionVertex) {
            ExecutionVertexID id = schedulingExecutionVertex.getId();
            executionSlotSharingGroup.addVertex(id);
            this.executionSlotSharingGroupMap.put(id, executionSlotSharingGroup);
        }

        private CoLocationConstraint getCoLocationConstraint(SchedulingExecutionVertex schedulingExecutionVertex) {
            JobVertexID jobVertexId = schedulingExecutionVertex.getId().getJobVertexId();
            return this.coLocationGroupMap.get(jobVertexId).getLocationConstraint(schedulingExecutionVertex.getId().getSubtaskIndex());
        }

        private int getSlotRoundRobinIndex(int i, SlotSharingGroup slotSharingGroup) {
            if (isMaxParallelism(i, slotSharingGroup)) {
                return 0;
            }
            return this.slotSharingGroupIndexMap.getOrDefault(slotSharingGroup, 0).intValue();
        }

        private void updateSlotRoundRobinIndexIfNeeded(int i, SlotSharingGroup slotSharingGroup, int i2) {
            if (isMaxParallelism(i, slotSharingGroup)) {
                return;
            }
            this.slotSharingGroupIndexMap.put(slotSharingGroup, Integer.valueOf(i2));
        }

        private boolean isMaxParallelism(int i, SlotSharingGroup slotSharingGroup) {
            return i == this.paralleledExecutionSlotSharingGroupsMap.get(slotSharingGroup).size();
        }

        private int getLeastUtilizeSlotIndex(List<ExecutionSlotSharingGroup> list, @Nullable SchedulingExecutionVertex schedulingExecutionVertex) {
            int i = 0;
            int i2 = Integer.MAX_VALUE;
            for (int i3 = 0; i3 < list.size(); i3++) {
                ExecutionSlotSharingGroup executionSlotSharingGroup = list.get(i3);
                int size = executionSlotSharingGroup.getExecutionVertexIds().size();
                if (i2 > size && (Objects.isNull(schedulingExecutionVertex) || allocatable(executionSlotSharingGroup, schedulingExecutionVertex))) {
                    i = i3;
                    i2 = size;
                }
            }
            return i;
        }

        private boolean allocatable(ExecutionSlotSharingGroup executionSlotSharingGroup, @Nonnull SchedulingExecutionVertex schedulingExecutionVertex) {
            return !((Set) executionSlotSharingGroup.getExecutionVertexIds().stream().map((v0) -> {
                return v0.getJobVertexId();
            }).collect(Collectors.toSet())).contains(schedulingExecutionVertex.getId().getJobVertexId());
        }
    }

    TaskBalancedPreferredSlotSharingStrategy(SchedulingTopology schedulingTopology, Set<SlotSharingGroup> set, Set<CoLocationGroup> set2) {
        super(schedulingTopology, set, set2);
    }

    @Override // org.apache.flink.runtime.scheduler.AbstractSlotSharingStrategy
    protected Map<ExecutionVertexID, ExecutionSlotSharingGroup> computeExecutionSlotSharingGroups(SchedulingTopology schedulingTopology) {
        return new TaskBalancedExecutionSlotSharingGroupBuilder(schedulingTopology, this.logicalSlotSharingGroups, this.coLocationGroups).build();
    }
}
