package org.apache.flink.table.planner.plan.nodes.exec.processor.utils;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.TreeMap;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.visitor.AbstractExecNodeExactlyOnceVisitor;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:flink-table-planner.jar:org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityGraphGenerator.class */
public abstract class InputPriorityGraphGenerator {
    private final List<ExecNode<?>> roots;
    private final Set<ExecNode<?>> boundaries;
    private final InputProperty.DamBehavior safeDamBehavior;
    protected TopologyGraph graph;

    public InputPriorityGraphGenerator(List<ExecNode<?>> list, Set<ExecNode<?>> set, InputProperty.DamBehavior damBehavior) {
        Preconditions.checkArgument(list.stream().allMatch(execNode -> {
            return execNode instanceof BatchExecNode;
        }), "InputPriorityConflictResolver can only be used for batch jobs.");
        this.roots = list;
        this.boundaries = set;
        this.safeDamBehavior = damBehavior;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void createTopologyGraph() {
        this.graph = new TopologyGraph(this.roots, this.boundaries);
        AbstractExecNodeExactlyOnceVisitor abstractExecNodeExactlyOnceVisitor = new AbstractExecNodeExactlyOnceVisitor() { // from class: org.apache.flink.table.planner.plan.nodes.exec.processor.utils.InputPriorityGraphGenerator.1
            @Override // org.apache.flink.table.planner.plan.nodes.exec.visitor.AbstractExecNodeExactlyOnceVisitor
            protected void visitNode(ExecNode<?> execNode) {
                if (!InputPriorityGraphGenerator.this.boundaries.contains(execNode)) {
                    visitInputs(execNode);
                }
                InputPriorityGraphGenerator.this.updateTopologyGraph(execNode);
            }
        };
        this.roots.forEach(execNode -> {
            execNode.accept(abstractExecNodeExactlyOnceVisitor);
        });
    }

    private void updateTopologyGraph(ExecNode<?> execNode) {
        TreeMap treeMap = new TreeMap();
        Preconditions.checkState(execNode.getInputEdges().size() == execNode.getInputProperties().size(), "Number of inputs nodes does not equal to number of input edges for node " + execNode.getClass().getName() + ". This is a bug.");
        for (int i = 0; i < execNode.getInputProperties().size(); i++) {
            ((List) treeMap.computeIfAbsent(Integer.valueOf(execNode.getInputProperties().get(i).getPriority()), num -> {
                return new ArrayList();
            })).add(Integer.valueOf(i));
        }
        ArrayList arrayList = new ArrayList(treeMap.values());
        for (int i2 = 0; i2 + 1 < arrayList.size(); i2++) {
            List list = (List) arrayList.get(i2);
            List list2 = (List) arrayList.get(i2 + 1);
            Iterator it = list.iterator();
            while (it.hasNext()) {
                int intValue = ((Integer) it.next()).intValue();
                Iterator it2 = list2.iterator();
                while (it2.hasNext()) {
                    addTopologyEdges(execNode, intValue, ((Integer) it2.next()).intValue());
                }
            }
        }
    }

    private void addTopologyEdges(ExecNode<?> execNode, int i, int i2) {
        ExecNode<?> source = execNode.getInputEdges().get(i).getSource();
        List<ExecNode<?>> calculatePipelinedAncestors = calculatePipelinedAncestors(execNode.getInputEdges().get(i2).getSource());
        ArrayList<Tuple2> arrayList = new ArrayList();
        for (ExecNode<?> execNode2 : calculatePipelinedAncestors) {
            if (!this.graph.link(source, execNode2)) {
                resolveInputPriorityConflict(execNode, i, i2);
                for (Tuple2 tuple2 : arrayList) {
                    this.graph.unlink((ExecNode) tuple2.f0, (ExecNode) tuple2.f1);
                }
                return;
            }
            arrayList.add(Tuple2.of(source, execNode2));
        }
    }

    @VisibleForTesting
    List<ExecNode<?>> calculatePipelinedAncestors(ExecNode<?> execNode) {
        final ArrayList arrayList = new ArrayList();
        execNode.accept(new AbstractExecNodeExactlyOnceVisitor() { // from class: org.apache.flink.table.planner.plan.nodes.exec.processor.utils.InputPriorityGraphGenerator.2
            @Override // org.apache.flink.table.planner.plan.nodes.exec.visitor.AbstractExecNodeExactlyOnceVisitor
            protected void visitNode(ExecNode<?> execNode2) {
                boolean z = false;
                if (!InputPriorityGraphGenerator.this.boundaries.contains(execNode2)) {
                    List<InputProperty> inputProperties = execNode2.getInputProperties();
                    for (int i = 0; i < inputProperties.size(); i++) {
                        if (!inputProperties.get(i).getDamBehavior().stricterOrEqual(InputPriorityGraphGenerator.this.safeDamBehavior)) {
                            z = true;
                            execNode2.getInputEdges().get(i).getSource().accept(this);
                        }
                    }
                }
                if (z) {
                    return;
                }
                arrayList.add(execNode2);
            }
        });
        return arrayList;
    }

    protected abstract void resolveInputPriorityConflict(ExecNode<?> execNode, int i, int i2);
}
