/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.nodes.exec.processor.utils;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.TreeMap;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.processor.utils.TopologyGraph;
import org.apache.flink.table.planner.plan.nodes.exec.visitor.AbstractExecNodeExactlyOnceVisitor;
import org.apache.flink.util.Preconditions;

@Internal
public abstract class InputPriorityGraphGenerator {
    private final List<ExecNode<?>> roots;
    private final Set<ExecNode<?>> boundaries;
    private final InputProperty.DamBehavior safeDamBehavior;
    protected TopologyGraph graph;

    public InputPriorityGraphGenerator(List<ExecNode<?>> roots, Set<ExecNode<?>> boundaries, InputProperty.DamBehavior safeDamBehavior) {
        Preconditions.checkArgument((boolean)roots.stream().allMatch(r -> r instanceof BatchExecNode), (Object)"InputPriorityConflictResolver can only be used for batch jobs.");
        this.roots = roots;
        this.boundaries = boundaries;
        this.safeDamBehavior = safeDamBehavior;
    }

    protected void createTopologyGraph() {
        this.graph = new TopologyGraph(this.roots, this.boundaries);
        AbstractExecNodeExactlyOnceVisitor inputPriorityVisitor = new AbstractExecNodeExactlyOnceVisitor(){

            @Override
            protected void visitNode(ExecNode<?> node) {
                if (!InputPriorityGraphGenerator.this.boundaries.contains(node)) {
                    this.visitInputs(node);
                }
                InputPriorityGraphGenerator.this.updateTopologyGraph(node);
            }
        };
        this.roots.forEach(n -> n.accept(inputPriorityVisitor));
    }

    private void updateTopologyGraph(ExecNode<?> node) {
        TreeMap<Integer, List> inputPriorityGroupMap = new TreeMap<Integer, List>();
        Preconditions.checkState((node.getInputEdges().size() == node.getInputProperties().size() ? 1 : 0) != 0, (Object)("Number of inputs nodes does not equal to number of input edges for node " + node.getClass().getName() + ". This is a bug."));
        for (int i = 0; i < node.getInputProperties().size(); ++i) {
            int priority = node.getInputProperties().get(i).getPriority();
            inputPriorityGroupMap.computeIfAbsent(priority, k -> new ArrayList()).add(i);
        }
        ArrayList inputPriorityGroups = new ArrayList(inputPriorityGroupMap.values());
        int i = 0;
        while (i + 1 < inputPriorityGroups.size()) {
            List higherGroup = (List)inputPriorityGroups.get(i);
            List lowerGroup = (List)inputPriorityGroups.get(i + 1);
            Iterator iterator = higherGroup.iterator();
            while (iterator.hasNext()) {
                int higher = (Integer)iterator.next();
                Iterator iterator2 = lowerGroup.iterator();
                while (iterator2.hasNext()) {
                    int lower = (Integer)iterator2.next();
                    this.addTopologyEdges(node, higher, lower);
                }
            }
            ++i;
        }
    }

    private void addTopologyEdges(ExecNode<?> node, int higherInput, int lowerInput) {
        ExecNode<?> higherNode = node.getInputEdges().get(higherInput).getSource();
        ExecNode<?> lowerNode = node.getInputEdges().get(lowerInput).getSource();
        List<ExecNode<?>> lowerAncestors = this.calculatePipelinedAncestors(lowerNode);
        ArrayList<Tuple2> linkedEdges = new ArrayList<Tuple2>();
        for (ExecNode<?> ancestor : lowerAncestors) {
            if (this.graph.link(higherNode, ancestor)) {
                linkedEdges.add(Tuple2.of(higherNode, ancestor));
                continue;
            }
            this.resolveInputPriorityConflict(node, higherInput, lowerInput);
            for (Tuple2 linkedEdge : linkedEdges) {
                this.graph.unlink((ExecNode)linkedEdge.f0, (ExecNode)linkedEdge.f1);
            }
            return;
        }
    }

    @VisibleForTesting
    List<ExecNode<?>> calculatePipelinedAncestors(ExecNode<?> node) {
        final ArrayList ret = new ArrayList();
        AbstractExecNodeExactlyOnceVisitor ancestorVisitor = new AbstractExecNodeExactlyOnceVisitor(){

            @Override
            protected void visitNode(ExecNode<?> node) {
                boolean hasAncestor = false;
                if (!InputPriorityGraphGenerator.this.boundaries.contains(node)) {
                    List<InputProperty> inputProperties = node.getInputProperties();
                    for (int i = 0; i < inputProperties.size(); ++i) {
                        if (inputProperties.get(i).getDamBehavior().stricterOrEqual(InputPriorityGraphGenerator.this.safeDamBehavior)) continue;
                        hasAncestor = true;
                        node.getInputEdges().get(i).getSource().accept(this);
                    }
                }
                if (!hasAncestor) {
                    ret.add(node);
                }
            }
        };
        node.accept(ancestorVisitor);
        return ret;
    }

    protected abstract void resolveInputPriorityConflict(ExecNode<?> var1, int var2, int var3);
}

