package org.apache.flink.table.planner.plan.nodes.exec.processor;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.flink.api.common.BatchShuffleMode;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecDynamicFilteringDataCollector;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExecutionOrderEnforcer;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecTableSourceScan;
import org.apache.flink.table.planner.plan.nodes.exec.visitor.AbstractExecNodeExactlyOnceVisitor;

/* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/processor/DynamicFilteringDependencyProcessor.class */
public class DynamicFilteringDependencyProcessor implements ExecNodeGraphProcessor {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/processor/DynamicFilteringDependencyProcessor$DescendantInfo.class */
    public static class DescendantInfo {
        final int inputId;
        final ExecNode<?> descendant;

        DescendantInfo(ExecNode<?> execNode, int i) {
            this.descendant = execNode;
            this.inputId = i;
        }
    }

    @Override // org.apache.flink.table.planner.plan.nodes.exec.processor.ExecNodeGraphProcessor
    public ExecNodeGraph process(ExecNodeGraph execNodeGraph, ProcessorContext processorContext) {
        return enforceDimSideBlockingExchange(addOrderEnforcer(execNodeGraph, processorContext), processorContext);
    }

    private ExecNodeGraph addOrderEnforcer(ExecNodeGraph execNodeGraph, ProcessorContext processorContext) {
        final HashMap hashMap = new HashMap();
        AbstractExecNodeExactlyOnceVisitor abstractExecNodeExactlyOnceVisitor = new AbstractExecNodeExactlyOnceVisitor() { // from class: org.apache.flink.table.planner.plan.nodes.exec.processor.DynamicFilteringDependencyProcessor.1
            @Override // org.apache.flink.table.planner.plan.nodes.exec.visitor.AbstractExecNodeExactlyOnceVisitor
            protected void visitNode(ExecNode<?> execNode) {
                for (int i = 0; i < execNode.getInputEdges().size(); i++) {
                    ExecNode<?> source = execNode.getInputEdges().get(i).getSource();
                    if ((source instanceof BatchExecTableSourceScan) && source.getInputEdges().size() > 0) {
                        ((List) hashMap.computeIfAbsent((BatchExecTableSourceScan) source, batchExecTableSourceScan -> {
                            return new ArrayList();
                        })).add(new DescendantInfo(execNode, i));
                    }
                }
                visitInputs(execNode);
            }
        };
        execNodeGraph.getRootNodes().forEach(execNode -> {
            execNode.accept(abstractExecNodeExactlyOnceVisitor);
        });
        for (Map.Entry entry : hashMap.entrySet()) {
            BatchExecTableSourceScan batchExecTableSourceScan = (BatchExecTableSourceScan) entry.getKey();
            BatchExecDynamicFilteringDataCollector dynamicFilteringDataCollector = BatchExecTableSourceScan.getDynamicFilteringDataCollector(batchExecTableSourceScan);
            BatchExecTableSourceScan copyAndRemoveInputs = batchExecTableSourceScan.copyAndRemoveInputs();
            BatchExecExchange batchExecExchange = new BatchExecExchange(processorContext.getPlanner().getTableConfig(), InputProperty.builder().requiredDistribution(InputProperty.BROADCAST_DISTRIBUTION).damBehavior(InputProperty.DamBehavior.BLOCKING).build(), dynamicFilteringDataCollector.getOutputType(), "Exchange");
            batchExecExchange.setRequiredExchangeMode(StreamExchangeMode.BATCH);
            batchExecExchange.setInputEdges(Collections.singletonList(ExecEdge.builder().source(dynamicFilteringDataCollector).target(batchExecExchange).build()));
            BatchExecExecutionOrderEnforcer batchExecExecutionOrderEnforcer = new BatchExecExecutionOrderEnforcer(processorContext.getPlanner().getTableConfig(), Arrays.asList(batchExecExchange.getInputProperties().get(0), InputProperty.DEFAULT), copyAndRemoveInputs.getOutputType(), "OrderEnforcer");
            batchExecExecutionOrderEnforcer.setInputEdges(Arrays.asList(ExecEdge.builder().source(batchExecExchange).target(batchExecExecutionOrderEnforcer).build(), ExecEdge.builder().source(copyAndRemoveInputs).target(batchExecExecutionOrderEnforcer).build()));
            ((List) entry.getValue()).forEach(descendantInfo -> {
                descendantInfo.descendant.replaceInputEdge(descendantInfo.inputId, ExecEdge.builder().source(batchExecExecutionOrderEnforcer).target(descendantInfo.descendant).build());
            });
        }
        return execNodeGraph;
    }

    private ExecNodeGraph enforceDimSideBlockingExchange(ExecNodeGraph execNodeGraph, final ProcessorContext processorContext) {
        if (processorContext.getPlanner().getTableConfig().getConfiguration().get(ExecutionOptions.BATCH_SHUFFLE_MODE) == BatchShuffleMode.ALL_EXCHANGES_BLOCKING) {
            return execNodeGraph;
        }
        final HashSet hashSet = new HashSet();
        AbstractExecNodeExactlyOnceVisitor abstractExecNodeExactlyOnceVisitor = new AbstractExecNodeExactlyOnceVisitor() { // from class: org.apache.flink.table.planner.plan.nodes.exec.processor.DynamicFilteringDependencyProcessor.2
            @Override // org.apache.flink.table.planner.plan.nodes.exec.visitor.AbstractExecNodeExactlyOnceVisitor
            protected void visitNode(ExecNode<?> execNode) {
                if (execNode instanceof BatchExecDynamicFilteringDataCollector) {
                    hashSet.add(execNode);
                }
                if (hashSet.contains(execNode)) {
                    Stream<R> map = execNode.getInputEdges().stream().map((v0) -> {
                        return v0.getSource();
                    });
                    Set set = hashSet;
                    Objects.requireNonNull(set);
                    map.forEach((v1) -> {
                        r1.add(v1);
                    });
                }
                visitInputs(execNode);
            }
        };
        execNodeGraph.getRootNodes().forEach(execNode -> {
            execNode.accept(abstractExecNodeExactlyOnceVisitor);
        });
        AbstractExecNodeExactlyOnceVisitor abstractExecNodeExactlyOnceVisitor2 = new AbstractExecNodeExactlyOnceVisitor() { // from class: org.apache.flink.table.planner.plan.nodes.exec.processor.DynamicFilteringDependencyProcessor.3
            @Override // org.apache.flink.table.planner.plan.nodes.exec.visitor.AbstractExecNodeExactlyOnceVisitor
            protected void visitNode(ExecNode<?> execNode2) {
                visitInputs(execNode2);
                if (hashSet.contains(execNode2)) {
                    return;
                }
                for (int i = 0; i < execNode2.getInputEdges().size(); i++) {
                    ExecNode<?> source = execNode2.getInputEdges().get(i).getSource();
                    if (hashSet.contains(source)) {
                        if (source instanceof BatchExecExchange) {
                            ((BatchExecExchange) source).setRequiredExchangeMode(StreamExchangeMode.BATCH);
                        } else if (execNode2 instanceof BatchExecExchange) {
                            ((BatchExecExchange) execNode2).setRequiredExchangeMode(StreamExchangeMode.BATCH);
                        } else {
                            execNode2.replaceInputEdge(i, ExecEdge.builder().source(DynamicFilteringDependencyProcessor.this.createExchange(source, execNode2.getInputProperties().get(i), processorContext.getPlanner().getTableConfig())).target(execNode2).build());
                        }
                    }
                }
            }
        };
        execNodeGraph.getRootNodes().forEach(execNode2 -> {
            execNode2.accept(abstractExecNodeExactlyOnceVisitor2);
        });
        return execNodeGraph;
    }

    private BatchExecExchange createExchange(ExecNode<?> execNode, InputProperty inputProperty, TableConfig tableConfig) {
        BatchExecExchange batchExecExchange = new BatchExecExchange(tableConfig, InputProperty.builder().requiredDistribution(inputProperty.getRequiredDistribution() == InputProperty.UNKNOWN_DISTRIBUTION ? InputProperty.ANY_DISTRIBUTION : inputProperty.getRequiredDistribution()).damBehavior(inputProperty.getDamBehavior()).priority(inputProperty.getPriority()).build(), execNode.getOutputType(), "Exchange");
        batchExecExchange.setRequiredExchangeMode(StreamExchangeMode.BATCH);
        batchExecExchange.setInputEdges(Collections.singletonList(ExecEdge.builder().source(execNode).target(batchExecExchange).build()));
        return batchExecExchange;
    }
}
