package org.apache.flink.table.planner.plan.nodes.exec.processor.utils;

import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange;
import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecTableSourceScan;
import org.apache.flink.table.planner.plan.nodes.exec.visitor.AbstractExecNodeExactlyOnceVisitor;
import org.apache.flink.table.planner.utils.StreamExchangeModeUtils;

@Internal
/* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolver.class */
public class InputPriorityConflictResolver extends InputPriorityGraphGenerator {
    private final StreamExchangeMode exchangeMode;
    private final ReadableConfig tableConfig;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolver$ConflictCausedByExchangeChecker.class */
    public static class ConflictCausedByExchangeChecker extends AbstractExecNodeExactlyOnceVisitor {
        private final BatchExecExchange exchange;
        private boolean found;

        private ConflictCausedByExchangeChecker(BatchExecExchange batchExecExchange) {
            this.exchange = batchExecExchange;
        }

        @Override // org.apache.flink.table.planner.plan.nodes.exec.visitor.AbstractExecNodeExactlyOnceVisitor
        protected void visitNode(ExecNode<?> execNode) {
            if (execNode == this.exchange) {
                this.found = true;
            }
            Iterator<ExecEdge> it = execNode.getInputEdges().iterator();
            while (it.hasNext()) {
                visit(it.next().getSource());
                if (this.found) {
                    return;
                }
            }
        }
    }

    public InputPriorityConflictResolver(List<ExecNode<?>> list, InputProperty.DamBehavior damBehavior, StreamExchangeMode streamExchangeMode, ReadableConfig readableConfig) {
        super(list, getDynamicFilteringSourceNodes(list), damBehavior);
        this.exchangeMode = streamExchangeMode;
        this.tableConfig = readableConfig;
    }

    public void detectAndResolve() {
        createTopologyGraph();
    }

    @Override // org.apache.flink.table.planner.plan.nodes.exec.processor.utils.InputPriorityGraphGenerator
    protected void resolveInputPriorityConflict(ExecNode<?> execNode, int i, int i2) {
        BatchExecExchange createExchange;
        ExecNode<?> source = execNode.getInputEdges().get(i).getSource();
        ExecNode<?> source2 = execNode.getInputEdges().get(i2).getSource();
        if (source2 instanceof BatchExecExchange) {
            BatchExecExchange batchExecExchange = (BatchExecExchange) source2;
            InputProperty inputProperty = batchExecExchange.getInputProperties().get(0);
            InputProperty build = InputProperty.builder().requiredDistribution(inputProperty.getRequiredDistribution()).priority(inputProperty.getPriority()).damBehavior(getDamBehavior()).build();
            if (isConflictCausedByExchange(source, batchExecExchange)) {
                BatchExecExchange batchExecExchange2 = new BatchExecExchange(this.tableConfig, build, batchExecExchange.getOutputType(), "Exchange");
                batchExecExchange2.setRequiredExchangeMode(this.exchangeMode);
                batchExecExchange2.setInputEdges(batchExecExchange.getInputEdges());
                createExchange = batchExecExchange2;
            } else {
                BatchExecExchange batchExecExchange3 = new BatchExecExchange(this.tableConfig, build, batchExecExchange.getOutputType(), batchExecExchange.getDescription());
                batchExecExchange3.setRequiredExchangeMode(this.exchangeMode);
                batchExecExchange3.setInputEdges(batchExecExchange.getInputEdges());
                createExchange = batchExecExchange3;
            }
        } else {
            createExchange = createExchange(execNode, i2);
        }
        execNode.replaceInputEdge(i2, ExecEdge.builder().source(createExchange).target(execNode).build());
    }

    private boolean isConflictCausedByExchange(ExecNode<?> execNode, BatchExecExchange batchExecExchange) {
        ConflictCausedByExchangeChecker conflictCausedByExchangeChecker = new ConflictCausedByExchangeChecker(batchExecExchange);
        conflictCausedByExchangeChecker.visit(execNode);
        return conflictCausedByExchangeChecker.found;
    }

    private BatchExecExchange createExchange(ExecNode<?> execNode, int i) {
        ExecNode<?> source = execNode.getInputEdges().get(i).getSource();
        InputProperty inputProperty = execNode.getInputProperties().get(i);
        InputProperty.RequiredDistribution requiredDistribution = inputProperty.getRequiredDistribution();
        if (requiredDistribution.getType() == InputProperty.DistributionType.BROADCAST) {
            throw new IllegalStateException("Trying to resolve input priority conflict on broadcast side. This is not expected.");
        }
        BatchExecExchange batchExecExchange = new BatchExecExchange(this.tableConfig, InputProperty.builder().requiredDistribution(requiredDistribution).priority(inputProperty.getPriority()).damBehavior(getDamBehavior()).build(), source.getOutputType(), "Exchange");
        batchExecExchange.setRequiredExchangeMode(this.exchangeMode);
        batchExecExchange.setInputEdges(Collections.singletonList(ExecEdge.builder().source(source).target(batchExecExchange).build()));
        return batchExecExchange;
    }

    private static Set<ExecNode<?>> getDynamicFilteringSourceNodes(List<ExecNode<?>> list) {
        final HashSet hashSet = new HashSet();
        AbstractExecNodeExactlyOnceVisitor abstractExecNodeExactlyOnceVisitor = new AbstractExecNodeExactlyOnceVisitor() { // from class: org.apache.flink.table.planner.plan.nodes.exec.processor.utils.InputPriorityConflictResolver.1
            @Override // org.apache.flink.table.planner.plan.nodes.exec.visitor.AbstractExecNodeExactlyOnceVisitor
            protected void visitNode(ExecNode<?> execNode) {
                if (!(execNode instanceof CommonExecTableSourceScan) || execNode.getInputEdges().size() <= 0) {
                    visitInputs(execNode);
                } else {
                    hashSet.add(execNode);
                }
            }
        };
        list.forEach(execNode -> {
            execNode.accept(abstractExecNodeExactlyOnceVisitor);
        });
        return hashSet;
    }

    private InputProperty.DamBehavior getDamBehavior() {
        return StreamExchangeModeUtils.getBatchStreamExchangeMode(this.tableConfig, this.exchangeMode) == StreamExchangeMode.BATCH ? InputProperty.DamBehavior.BLOCKING : InputProperty.DamBehavior.PIPELINED;
    }
}
