package org.apache.flink.table.planner.plan.nodes.exec.processor;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.configuration.BatchExecutionOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.planner.plan.nodes.exec.AdaptiveJoinExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecAdaptiveJoin;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange;
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.visitor.AbstractExecNodeExactlyOnceVisitor;
import org.apache.flink.table.planner.plan.utils.OperatorType;
import org.apache.flink.table.planner.utils.TableConfigUtils;

/* loaded from: input_file:flink-table-planner.jar:org/apache/flink/table/planner/plan/nodes/exec/processor/AdaptiveJoinProcessor.class */
public class AdaptiveJoinProcessor implements ExecNodeGraphProcessor {
    @Override // org.apache.flink.table.planner.plan.nodes.exec.processor.ExecNodeGraphProcessor
    public ExecNodeGraph process(ExecNodeGraph execNodeGraph, ProcessorContext processorContext) {
        if (execNodeGraph.getRootNodes().get(0) instanceof StreamExecNode) {
            throw new TableException("AdaptiveJoin does not support streaming jobs.");
        }
        if (!isAdaptiveJoinEnabled(processorContext)) {
            return execNodeGraph;
        }
        AbstractExecNodeExactlyOnceVisitor abstractExecNodeExactlyOnceVisitor = new AbstractExecNodeExactlyOnceVisitor() { // from class: org.apache.flink.table.planner.plan.nodes.exec.processor.AdaptiveJoinProcessor.1
            @Override // org.apache.flink.table.planner.plan.nodes.exec.visitor.AbstractExecNodeExactlyOnceVisitor
            protected void visitNode(ExecNode<?> execNode) {
                visitInputs(execNode);
                if (AdaptiveJoinProcessor.this.shouldKeepInputAsIs(execNode.getInputProperties())) {
                    return;
                }
                for (int i = 0; i < execNode.getInputEdges().size(); i++) {
                    ExecEdge execEdge = execNode.getInputEdges().get(i);
                    execNode.replaceInputEdge(i, ExecEdge.builder().source(AdaptiveJoinProcessor.this.tryReplaceWithAdaptiveJoinNode(execEdge.getSource())).target(execNode).shuffle(execEdge.getShuffle()).exchangeMode(execEdge.getExchangeMode()).build());
                }
            }
        };
        return new ExecNodeGraph(execNodeGraph.getFlinkVersion(), (List) execNodeGraph.getRootNodes().stream().map(execNode -> {
            ExecNode<?> tryReplaceWithAdaptiveJoinNode = tryReplaceWithAdaptiveJoinNode(execNode);
            tryReplaceWithAdaptiveJoinNode.accept(abstractExecNodeExactlyOnceVisitor);
            return tryReplaceWithAdaptiveJoinNode;
        }).collect(Collectors.toList()));
    }

    private ExecNode<?> tryReplaceWithAdaptiveJoinNode(ExecNode<?> execNode) {
        if (!areAllInputsHashShuffle(execNode) || shouldKeepUpstreamExchangeInputAsIs(execNode.getInputEdges())) {
            return execNode;
        }
        ExecNode<?> execNode2 = execNode;
        if ((execNode instanceof AdaptiveJoinExecNode) && ((AdaptiveJoinExecNode) execNode).canBeTransformedToAdaptiveJoin()) {
            BatchExecAdaptiveJoin adaptiveJoinNode = ((AdaptiveJoinExecNode) execNode).toAdaptiveJoinNode();
            replaceInputEdge(adaptiveJoinNode, execNode);
            execNode2 = adaptiveJoinNode;
        }
        return execNode2;
    }

    private boolean shouldKeepInputAsIs(List<InputProperty> list) {
        return list.stream().anyMatch(inputProperty -> {
            return inputProperty.getRequiredDistribution().getType() == InputProperty.DistributionType.KEEP_INPUT_AS_IS;
        });
    }

    private boolean shouldKeepUpstreamExchangeInputAsIs(List<ExecEdge> list) {
        return list.stream().filter(execEdge -> {
            return execEdge.getSource() instanceof BatchExecExchange;
        }).map(execEdge2 -> {
            return (BatchExecExchange) execEdge2.getSource();
        }).anyMatch(batchExecExchange -> {
            return shouldKeepInputAsIs(batchExecExchange.getInputProperties());
        });
    }

    private boolean isAdaptiveJoinEnabled(ProcessorContext processorContext) {
        TableConfig tableConfig = processorContext.getPlanner().getTableConfig();
        return ((tableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_ADAPTIVE_BROADCAST_JOIN_STRATEGY) != OptimizerConfigOptions.AdaptiveBroadcastJoinStrategy.NONE && !TableConfigUtils.isOperatorDisabled(tableConfig, OperatorType.BroadcastHashJoin)) || (tableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_ADAPTIVE_SKEWED_JOIN_OPTIMIZATION_STRATEGY) != OptimizerConfigOptions.AdaptiveSkewedJoinOptimizationStrategy.NONE)) && (((JobManagerOptions.SchedulerType) processorContext.getPlanner().getExecEnv().getConfig().getSchedulerType().orElse(JobManagerOptions.SchedulerType.AdaptiveBatch)) == JobManagerOptions.SchedulerType.AdaptiveBatch) && !((Boolean) tableConfig.get(BatchExecutionOptions.JOB_RECOVERY_ENABLED)).booleanValue();
    }

    private boolean areAllInputsHashShuffle(ExecNode<?> execNode) {
        Iterator<InputProperty> it = execNode.getInputProperties().iterator();
        while (it.hasNext()) {
            if (it.next().getRequiredDistribution().getType() != InputProperty.DistributionType.HASH) {
                return false;
            }
        }
        return true;
    }

    private void replaceInputEdge(ExecNode<?> execNode, ExecNode<?> execNode2) {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < execNode2.getInputEdges().size(); i++) {
            ExecEdge execEdge = execNode2.getInputEdges().get(i);
            arrayList.add(ExecEdge.builder().source(execEdge.getSource()).target(execNode).shuffle(execEdge.getShuffle()).exchangeMode(execEdge.getExchangeMode()).build());
        }
        execNode.setInputEdges(arrayList);
    }
}
