/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.strategy;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.scheduler.SchedulerOperations;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.ResultPartitionState;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.SchedulingPipelinedRegion;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyUtils;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.Preconditions;

public class PipelinedRegionSchedulingStrategy
implements SchedulingStrategy {
    private final SchedulerOperations schedulerOperations;
    private final SchedulingTopology schedulingTopology;
    private final Map<ConsumedPartitionGroup, Set<SchedulingPipelinedRegion>> partitionGroupConsumerRegions = new IdentityHashMap<ConsumedPartitionGroup, Set<SchedulingPipelinedRegion>>();
    private final Map<SchedulingPipelinedRegion, List<ExecutionVertexID>> regionVerticesSorted = new IdentityHashMap<SchedulingPipelinedRegion, List<ExecutionVertexID>>();
    private final Map<SchedulingPipelinedRegion, Set<ConsumedPartitionGroup>> producedPartitionGroupsOfRegion = new IdentityHashMap<SchedulingPipelinedRegion, Set<ConsumedPartitionGroup>>();
    private final Set<ConsumedPartitionGroup> crossRegionConsumedPartitionGroups = Collections.newSetFromMap(new IdentityHashMap());
    private final Set<SchedulingPipelinedRegion> scheduledRegions = Collections.newSetFromMap(new IdentityHashMap());

    public PipelinedRegionSchedulingStrategy(SchedulerOperations schedulerOperations, SchedulingTopology schedulingTopology) {
        this.schedulerOperations = (SchedulerOperations)Preconditions.checkNotNull((Object)schedulerOperations);
        this.schedulingTopology = (SchedulingTopology)Preconditions.checkNotNull((Object)schedulingTopology);
        this.init();
    }

    private void init() {
        this.initCrossRegionConsumedPartitionGroups();
        this.initPartitionGroupConsumerRegions();
        this.initProducedPartitionGroupsOfRegion();
        for (SchedulingExecutionVertex vertex : this.schedulingTopology.getVertices()) {
            SchedulingPipelinedRegion region = (SchedulingPipelinedRegion)this.schedulingTopology.getPipelinedRegionOfVertex((ExecutionVertexID)vertex.getId());
            this.regionVerticesSorted.computeIfAbsent(region, r -> new ArrayList()).add((ExecutionVertexID)vertex.getId());
        }
    }

    private void initProducedPartitionGroupsOfRegion() {
        for (SchedulingPipelinedRegion region : this.schedulingTopology.getAllPipelinedRegions()) {
            HashSet producedPartitionGroupsSetOfRegion = new HashSet();
            for (SchedulingExecutionVertex executionVertex : region.getVertices()) {
                producedPartitionGroupsSetOfRegion.addAll(IterableUtils.toStream(executionVertex.getProducedResults()).flatMap(partition -> partition.getConsumedPartitionGroups().stream()).collect(Collectors.toSet()));
            }
            this.producedPartitionGroupsOfRegion.put(region, producedPartitionGroupsSetOfRegion);
        }
    }

    private void initCrossRegionConsumedPartitionGroups() {
        IdentityHashMap<ConsumedPartitionGroup, Set> producerRegionsByConsumedPartitionGroup = new IdentityHashMap<ConsumedPartitionGroup, Set>();
        for (SchedulingPipelinedRegion pipelinedRegion : this.schedulingTopology.getAllPipelinedRegions()) {
            for (ConsumedPartitionGroup consumedPartitionGroup : pipelinedRegion.getAllNonPipelinedConsumedPartitionGroups()) {
                producerRegionsByConsumedPartitionGroup.computeIfAbsent(consumedPartitionGroup, this::getProducerRegionsForConsumedPartitionGroup);
            }
        }
        for (SchedulingPipelinedRegion pipelinedRegion : this.schedulingTopology.getAllPipelinedRegions()) {
            for (ConsumedPartitionGroup consumedPartitionGroup : pipelinedRegion.getAllNonPipelinedConsumedPartitionGroups()) {
                Set producerRegions = (Set)producerRegionsByConsumedPartitionGroup.get(consumedPartitionGroup);
                if (producerRegions.size() <= 1 || !producerRegions.contains(pipelinedRegion)) continue;
                this.crossRegionConsumedPartitionGroups.add(consumedPartitionGroup);
            }
        }
    }

    private Set<SchedulingPipelinedRegion> getProducerRegionsForConsumedPartitionGroup(ConsumedPartitionGroup consumedPartitionGroup) {
        Set<SchedulingPipelinedRegion> producerRegions = Collections.newSetFromMap(new IdentityHashMap());
        for (IntermediateResultPartitionID partitionId : consumedPartitionGroup) {
            producerRegions.add(this.getProducerRegion(partitionId));
        }
        return producerRegions;
    }

    private SchedulingPipelinedRegion getProducerRegion(IntermediateResultPartitionID partitionId) {
        return (SchedulingPipelinedRegion)this.schedulingTopology.getPipelinedRegionOfVertex((ExecutionVertexID)((SchedulingExecutionVertex)this.schedulingTopology.getResultPartition(partitionId).getProducer()).getId());
    }

    private void initPartitionGroupConsumerRegions() {
        for (SchedulingPipelinedRegion region : this.schedulingTopology.getAllPipelinedRegions()) {
            for (ConsumedPartitionGroup consumedPartitionGroup : region.getAllNonPipelinedConsumedPartitionGroups()) {
                if (!this.crossRegionConsumedPartitionGroups.contains(consumedPartitionGroup) && !this.isExternalConsumedPartitionGroup(consumedPartitionGroup, region)) continue;
                this.partitionGroupConsumerRegions.computeIfAbsent(consumedPartitionGroup, group -> new HashSet()).add(region);
            }
        }
    }

    private Set<SchedulingPipelinedRegion> getBlockingDownstreamRegionsOfVertex(SchedulingExecutionVertex executionVertex) {
        return IterableUtils.toStream(executionVertex.getProducedResults()).filter(partition -> !partition.getResultType().canBePipelinedConsumed()).flatMap(partition -> partition.getConsumedPartitionGroups().stream()).filter(group -> this.crossRegionConsumedPartitionGroups.contains(group) || group.areAllPartitionsFinished()).flatMap(partitionGroup -> this.partitionGroupConsumerRegions.getOrDefault(partitionGroup, Collections.emptySet()).stream()).collect(Collectors.toSet());
    }

    @Override
    public void startScheduling() {
        Set<SchedulingPipelinedRegion> sourceRegions = IterableUtils.toStream(this.schedulingTopology.getAllPipelinedRegions()).filter(this::isSourceRegion).collect(Collectors.toSet());
        this.maybeScheduleRegions(sourceRegions);
    }

    private boolean isSourceRegion(SchedulingPipelinedRegion region) {
        for (ConsumedPartitionGroup consumedPartitionGroup : region.getAllNonPipelinedConsumedPartitionGroups()) {
            if (!this.crossRegionConsumedPartitionGroups.contains(consumedPartitionGroup) && !this.isExternalConsumedPartitionGroup(consumedPartitionGroup, region)) continue;
            return false;
        }
        return true;
    }

    @Override
    public void restartTasks(Set<ExecutionVertexID> verticesToRestart) {
        Set<SchedulingPipelinedRegion> regionsToRestart = verticesToRestart.stream().map(this.schedulingTopology::getPipelinedRegionOfVertex).collect(Collectors.toSet());
        this.scheduledRegions.removeAll(regionsToRestart);
        this.maybeScheduleRegions(regionsToRestart);
    }

    @Override
    public void onExecutionStateChange(ExecutionVertexID executionVertexId, ExecutionState executionState) {
        if (executionState == ExecutionState.FINISHED) {
            this.maybeScheduleRegions(this.getBlockingDownstreamRegionsOfVertex(this.schedulingTopology.getVertex(executionVertexId)));
        }
    }

    @Override
    public void onPartitionConsumable(IntermediateResultPartitionID resultPartitionId) {
    }

    private void maybeScheduleRegions(Set<SchedulingPipelinedRegion> regions) {
        HashSet<SchedulingPipelinedRegion> regionsToSchedule = new HashSet<SchedulingPipelinedRegion>();
        Set<SchedulingPipelinedRegion> nextRegions = regions;
        while (!nextRegions.isEmpty()) {
            nextRegions = this.addSchedulableAndGetNextRegions(nextRegions, regionsToSchedule);
        }
        SchedulingStrategyUtils.sortPipelinedRegionsInTopologicalOrder(this.schedulingTopology, regionsToSchedule).forEach(this::scheduleRegion);
    }

    private Set<SchedulingPipelinedRegion> addSchedulableAndGetNextRegions(Set<SchedulingPipelinedRegion> currentRegions, Set<SchedulingPipelinedRegion> regionsToSchedule) {
        HashSet<SchedulingPipelinedRegion> nextRegions = new HashSet<SchedulingPipelinedRegion>();
        HashMap<ConsumedPartitionGroup, Boolean> consumableStatusCache = new HashMap<ConsumedPartitionGroup, Boolean>();
        HashSet visitedConsumedPartitionGroups = new HashSet();
        for (SchedulingPipelinedRegion currentRegion : currentRegions) {
            if (!this.isRegionSchedulable(currentRegion, consumableStatusCache, regionsToSchedule)) continue;
            regionsToSchedule.add(currentRegion);
            this.producedPartitionGroupsOfRegion.getOrDefault(currentRegion, Collections.emptySet()).forEach(producedPartitionGroup -> {
                if (!producedPartitionGroup.getResultPartitionType().canBePipelinedConsumed()) {
                    return;
                }
                if (visitedConsumedPartitionGroups.contains(producedPartitionGroup)) {
                    return;
                }
                visitedConsumedPartitionGroups.add(producedPartitionGroup);
                nextRegions.addAll(this.partitionGroupConsumerRegions.getOrDefault(producedPartitionGroup, Collections.emptySet()));
            });
        }
        return nextRegions;
    }

    private boolean isRegionSchedulable(SchedulingPipelinedRegion region, Map<ConsumedPartitionGroup, Boolean> consumableStatusCache, Set<SchedulingPipelinedRegion> regionToSchedule) {
        return !regionToSchedule.contains(region) && !this.scheduledRegions.contains(region) && this.areRegionInputsAllConsumable(region, consumableStatusCache, regionToSchedule);
    }

    private void scheduleRegion(SchedulingPipelinedRegion region) {
        Preconditions.checkState((boolean)this.areRegionVerticesAllInCreatedState(region), (Object)"BUG: trying to schedule a region which is not in CREATED state");
        this.scheduledRegions.add(region);
        this.schedulerOperations.allocateSlotsAndDeploy(this.regionVerticesSorted.get(region));
    }

    private boolean areRegionInputsAllConsumable(SchedulingPipelinedRegion region, Map<ConsumedPartitionGroup, Boolean> consumableStatusCache, Set<SchedulingPipelinedRegion> regionToSchedule) {
        for (ConsumedPartitionGroup consumedPartitionGroup : region.getAllNonPipelinedConsumedPartitionGroups()) {
            if (!(this.crossRegionConsumedPartitionGroups.contains(consumedPartitionGroup) ? !this.isDownstreamOfCrossRegionConsumedPartitionSchedulable(consumedPartitionGroup, region, regionToSchedule) : this.isExternalConsumedPartitionGroup(consumedPartitionGroup, region) && consumableStatusCache.computeIfAbsent(consumedPartitionGroup, group -> this.isDownstreamConsumedPartitionGroupSchedulable((ConsumedPartitionGroup)group, regionToSchedule)) == false)) continue;
            return false;
        }
        return true;
    }

    private boolean isDownstreamConsumedPartitionGroupSchedulable(ConsumedPartitionGroup consumedPartitionGroup, Set<SchedulingPipelinedRegion> regionToSchedule) {
        if (consumedPartitionGroup.getResultPartitionType().canBePipelinedConsumed()) {
            for (IntermediateResultPartitionID partitionId : consumedPartitionGroup) {
                SchedulingPipelinedRegion producerRegion = this.getProducerRegion(partitionId);
                if (this.scheduledRegions.contains(producerRegion) || regionToSchedule.contains(producerRegion)) continue;
                return false;
            }
        } else {
            for (IntermediateResultPartitionID partitionId : consumedPartitionGroup) {
                if (this.schedulingTopology.getResultPartition(partitionId).getState() == ResultPartitionState.ALL_DATA_PRODUCED) continue;
                return false;
            }
        }
        return true;
    }

    private boolean isDownstreamOfCrossRegionConsumedPartitionSchedulable(ConsumedPartitionGroup consumedPartitionGroup, SchedulingPipelinedRegion pipelinedRegion, Set<SchedulingPipelinedRegion> regionToSchedule) {
        if (consumedPartitionGroup.getResultPartitionType().canBePipelinedConsumed()) {
            for (IntermediateResultPartitionID partitionId : consumedPartitionGroup) {
                SchedulingPipelinedRegion producerRegion;
                if (!this.isExternalConsumedPartition(partitionId, pipelinedRegion) || regionToSchedule.contains(producerRegion = this.getProducerRegion(partitionId)) || this.scheduledRegions.contains(producerRegion)) continue;
                return false;
            }
        } else {
            for (IntermediateResultPartitionID partitionId : consumedPartitionGroup) {
                if (!this.isExternalConsumedPartition(partitionId, pipelinedRegion) || this.schedulingTopology.getResultPartition(partitionId).getState() == ResultPartitionState.ALL_DATA_PRODUCED) continue;
                return false;
            }
        }
        return true;
    }

    private boolean areRegionVerticesAllInCreatedState(SchedulingPipelinedRegion region) {
        for (SchedulingExecutionVertex vertex : region.getVertices()) {
            if (vertex.getState() == ExecutionState.CREATED) continue;
            return false;
        }
        return true;
    }

    private boolean isExternalConsumedPartitionGroup(ConsumedPartitionGroup consumedPartitionGroup, SchedulingPipelinedRegion pipelinedRegion) {
        return this.isExternalConsumedPartition(consumedPartitionGroup.getFirst(), pipelinedRegion);
    }

    private boolean isExternalConsumedPartition(IntermediateResultPartitionID partitionId, SchedulingPipelinedRegion pipelinedRegion) {
        return !pipelinedRegion.contains((ExecutionVertexID)((SchedulingExecutionVertex)this.schedulingTopology.getResultPartition(partitionId).getProducer()).getId());
    }

    @VisibleForTesting
    Set<ConsumedPartitionGroup> getCrossRegionConsumedPartitionGroups() {
        return Collections.unmodifiableSet(this.crossRegionConsumedPartitionGroups);
    }

    public static class Factory
    implements SchedulingStrategyFactory {
        @Override
        public SchedulingStrategy createInstance(SchedulerOperations schedulerOperations, SchedulingTopology schedulingTopology) {
            return new PipelinedRegionSchedulingStrategy(schedulerOperations, schedulingTopology);
        }
    }
}

