/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.nodes.exec.batch;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.transformations.MultipleInputTransformation;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.fusion.FusionCodegenUtil;
import org.apache.flink.table.planner.plan.fusion.OpFusionCodegenSpecGenerator;
import org.apache.flink.table.planner.plan.fusion.generator.OneInputOpFusionCodegenSpecGenerator;
import org.apache.flink.table.planner.plan.fusion.spec.OutputFusionCodegenSpec;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.FusionCodegenExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecInputAdapter;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.runtime.operators.fusion.OperatorFusionCodegenFactory;
import org.apache.flink.table.runtime.operators.multipleinput.BatchMultipleInputStreamOperatorFactory;
import org.apache.flink.table.runtime.operators.multipleinput.TableOperatorWrapperGenerator;
import org.apache.flink.table.runtime.operators.multipleinput.input.InputSelectionSpec;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
import scala.Tuple2;

public class BatchExecMultipleInput
extends ExecNodeBase<RowData>
implements BatchExecNode<RowData>,
SingleTransformationTranslator<RowData> {
    private final ExecNode<?> rootNode;
    private final List<ExecNode<?>> memberExecNodes;
    private final List<ExecEdge> originalEdges;

    public BatchExecMultipleInput(ReadableConfig tableConfig, List<InputProperty> inputProperties, ExecNode<?> rootNode, List<ExecNode<?>> memberExecNodes, List<ExecEdge> originalEdges, String description) {
        super(ExecNodeContext.newNodeId(), ExecNodeContext.newContext(BatchExecMultipleInput.class), ExecNodeContext.newPersistedConfig(BatchExecMultipleInput.class, tableConfig), inputProperties, rootNode.getOutputType(), description);
        this.rootNode = rootNode;
        this.memberExecNodes = memberExecNodes;
        Preconditions.checkArgument((inputProperties.size() == originalEdges.size() ? 1 : 0) != 0);
        this.originalEdges = originalEdges;
    }

    @Override
    protected Transformation<RowData> translateToPlanInternal(PlannerBase planner, ExecNodeConfig config) {
        long memoryBytes;
        int maxParallelism;
        int parallelism;
        BatchMultipleInputStreamOperatorFactory operatorFactory;
        List<Object> inputTransforms = new ArrayList();
        for (ExecEdge inputEdge : this.getInputEdges()) {
            inputTransforms.add(inputEdge.translateToPlan(planner));
        }
        Transformation outputTransform = this.rootNode.translateToPlan(planner);
        int[] readOrders = this.getInputProperties().stream().map(InputProperty::getPriority).mapToInt(i -> i).toArray();
        ResourceSpec minResources = null;
        ResourceSpec preferredResources = null;
        boolean fusionCodegenEnabled = (Boolean)config.get(ExecutionConfigOptions.TABLE_EXEC_OPERATOR_FUSION_CODEGEN_ENABLED);
        if (fusionCodegenEnabled && this.allSupportFusionCodegen()) {
            ArrayList<InputSelectionSpec> inputSelectionSpecs = new ArrayList<InputSelectionSpec>();
            int i2 = 0;
            for (ExecEdge inputEdge : this.originalEdges) {
                int multipleInputId = i2 + 1;
                BatchExecNode source = (BatchExecNode)inputEdge.getSource();
                BatchExecInputAdapter inputAdapter = new BatchExecInputAdapter(multipleInputId, (ReadableConfig)TableConfig.getDefault(), InputProperty.builder().priority(readOrders[i2]).build(), source.getOutputType(), "BatchInputAdapter");
                inputAdapter.setInputEdges(Collections.singletonList(ExecEdge.builder().source(source).target(inputAdapter).build()));
                BatchExecNode target = (BatchExecNode)inputEdge.getTarget();
                int edgeIdxInTargetNode = target.getInputEdges().indexOf(inputEdge);
                Preconditions.checkArgument((edgeIdxInTargetNode >= 0 ? 1 : 0) != 0);
                target.replaceInputEdge(edgeIdxInTargetNode, ExecEdge.builder().source(inputAdapter).target(target).build());
                inputSelectionSpecs.add(new InputSelectionSpec(multipleInputId, readOrders[i2]));
                ++i2;
            }
            OpFusionCodegenSpecGenerator opFusionCodegenSpecGenerator = this.rootNode.translateToFusionCodegenSpec(planner);
            OneInputOpFusionCodegenSpecGenerator outputGenerator = new OneInputOpFusionCodegenSpecGenerator(opFusionCodegenSpecGenerator, 0L, (RowType)this.getOutputType(), new OutputFusionCodegenSpec(new CodeGeneratorContext(config, planner.getFlinkContext().getClassLoader())));
            opFusionCodegenSpecGenerator.addOutput(1, outputGenerator);
            Tuple2<OperatorFusionCodegenFactory<RowData>, Object> multipleOperatorTuple = FusionCodegenUtil.generateFusionOperator(outputGenerator, inputSelectionSpecs);
            operatorFactory = (StreamOperatorFactory)multipleOperatorTuple._1;
            Pair<Integer, Integer> parallelismPair = this.getInputMaxParallelism(inputTransforms);
            parallelism = (Integer)parallelismPair.getLeft();
            maxParallelism = (Integer)parallelismPair.getRight();
            memoryBytes = (Long)multipleOperatorTuple._2;
        } else {
            TableOperatorWrapperGenerator generator = new TableOperatorWrapperGenerator(inputTransforms, outputTransform, readOrders);
            generator.generate();
            List inputTransformAndInputSpecPairs = generator.getInputTransformAndInputSpecPairs();
            operatorFactory = new BatchMultipleInputStreamOperatorFactory(inputTransformAndInputSpecPairs.stream().map(Pair::getValue).collect(Collectors.toList()), generator.getHeadWrappers(), generator.getTailWrapper());
            parallelism = generator.getParallelism();
            maxParallelism = generator.getMaxParallelism();
            int n = generator.getManagedMemoryWeight();
            memoryBytes = (long)n << 20;
            minResources = generator.getMinResources();
            preferredResources = generator.getPreferredResources();
            inputTransforms = inputTransformAndInputSpecPairs.stream().map(Pair::getKey).collect(Collectors.toList());
        }
        MultipleInputTransformation multipleInputTransform = new MultipleInputTransformation(this.createTransformationName(config), (StreamOperatorFactory)operatorFactory, (TypeInformation)InternalTypeInfo.of((LogicalType)this.getOutputType()), parallelism, false);
        multipleInputTransform.setDescription(this.createTransformationDescription(config));
        if (maxParallelism > 0) {
            multipleInputTransform.setMaxParallelism(maxParallelism);
        }
        for (Transformation transformation : inputTransforms) {
            multipleInputTransform.addInput(transformation);
        }
        if (minResources != null && preferredResources != null) {
            multipleInputTransform.setResources(minResources, preferredResources);
        }
        multipleInputTransform.setDescription(this.createTransformationDescription(config));
        ExecNodeUtil.setManagedMemoryWeight(multipleInputTransform, memoryBytes);
        multipleInputTransform.setChainingStrategy(ChainingStrategy.HEAD_WITH_SOURCES);
        return multipleInputTransform;
    }

    public List<ExecEdge> getOriginalEdges() {
        return this.originalEdges;
    }

    @VisibleForTesting
    public ExecNode<?> getRootNode() {
        return this.rootNode;
    }

    private boolean allSupportFusionCodegen() {
        return this.memberExecNodes.stream().map(FusionCodegenExecNode::supportFusionCodegen).reduce(true, (n1, n2) -> n1 != false && n2 != false);
    }

    private Pair<Integer, Integer> getInputMaxParallelism(List<Transformation<?>> inputTransformations) {
        int parallelism = -1;
        int maxParallelism = -1;
        for (Transformation<?> transform : inputTransformations) {
            parallelism = Math.max(parallelism, transform.getParallelism());
            maxParallelism = Math.max(maxParallelism, transform.getMaxParallelism());
        }
        return Pair.of((Object)parallelism, (Object)maxParallelism);
    }
}

