/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptivebatch;

import java.util.List;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler;
import org.apache.flink.runtime.scheduler.adaptivebatch.ExecutionPlanSchedulingContext;
import org.apache.flink.streaming.api.graph.AdaptiveGraphManager;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.util.ImmutableStreamNode;
import org.apache.flink.util.Preconditions;

public class AdaptiveExecutionPlanSchedulingContext
implements ExecutionPlanSchedulingContext {
    private final AdaptiveGraphManager adaptiveGraphManager;
    private final int defaultMaxParallelism;

    public AdaptiveExecutionPlanSchedulingContext(AdaptiveGraphManager adaptiveGraphManager, int defaultMaxParallelism) {
        this.adaptiveGraphManager = (AdaptiveGraphManager)Preconditions.checkNotNull((Object)adaptiveGraphManager);
        this.defaultMaxParallelism = defaultMaxParallelism;
    }

    @Override
    public int getConsumersParallelism(Function<JobVertexID, Integer> executionJobVertexParallelismRetriever, IntermediateDataSet intermediateDataSet) {
        Set consumerParallelisms;
        List<StreamEdge> consumerStreamEdges = this.adaptiveGraphManager.getOutputStreamEdges(intermediateDataSet.getId());
        List<JobEdge> consumerJobEdges = intermediateDataSet.getConsumers();
        if (consumerJobEdges.isEmpty()) {
            Preconditions.checkState((!consumerStreamEdges.isEmpty() ? 1 : 0) != 0);
            consumerParallelisms = consumerStreamEdges.stream().map(StreamEdge::getTargetId).map(this::getParallelism).collect(Collectors.toSet());
        } else {
            consumerParallelisms = consumerJobEdges.stream().map(jobEdge -> jobEdge.getTarget().getID()).map(executionJobVertexParallelismRetriever).collect(Collectors.toSet());
        }
        Preconditions.checkState((consumerParallelisms.size() == 1 ? 1 : 0) != 0);
        return (Integer)consumerParallelisms.iterator().next();
    }

    @Override
    public int getConsumersMaxParallelism(Function<JobVertexID, Integer> executionJobVertexMaxParallelismRetriever, IntermediateDataSet intermediateDataSet) {
        Set consumerMaxParallelisms;
        List<StreamEdge> consumerStreamEdges = this.adaptiveGraphManager.getOutputStreamEdges(intermediateDataSet.getId());
        List<JobEdge> consumerJobEdges = intermediateDataSet.getConsumers();
        if (consumerJobEdges.isEmpty()) {
            Preconditions.checkState((!consumerStreamEdges.isEmpty() ? 1 : 0) != 0);
            consumerMaxParallelisms = consumerStreamEdges.stream().map(StreamEdge::getTargetId).map(this::getMaxParallelismOrDefault).collect(Collectors.toSet());
        } else {
            consumerMaxParallelisms = consumerJobEdges.stream().map(jobEdge -> jobEdge.getTarget().getID()).map(executionJobVertexMaxParallelismRetriever).collect(Collectors.toSet());
        }
        Preconditions.checkState((consumerMaxParallelisms.size() == 1 ? 1 : 0) != 0, (Object)"Consumers must have the same max parallelism.");
        return (Integer)consumerMaxParallelisms.iterator().next();
    }

    @Override
    public int getPendingOperatorCount() {
        return this.adaptiveGraphManager.getPendingOperatorsCount();
    }

    @Override
    public String getStreamGraphJson() {
        return this.adaptiveGraphManager.getStreamGraphJson();
    }

    private int getParallelism(int streamNodeId) {
        return this.adaptiveGraphManager.getStreamGraphContext().getStreamGraph().getStreamNode(streamNodeId).getParallelism();
    }

    private int getMaxParallelismOrDefault(int streamNodeId) {
        ImmutableStreamNode streamNode = this.adaptiveGraphManager.getStreamGraphContext().getStreamGraph().getStreamNode(streamNodeId);
        if (streamNode.getMaxParallelism() == -1) {
            return AdaptiveBatchScheduler.computeMaxParallelism(streamNode.getParallelism(), this.defaultMaxParallelism);
        }
        return streamNode.getMaxParallelism();
    }
}

