package org.apache.flink.table.planner.plan.nodes.exec.common;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.codegen.AsyncCodeGenerator;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.utils.AsyncUtil;
import org.apache.flink.table.runtime.operators.calc.async.AsyncFunctionRunner;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecAsyncCalc.class */
public abstract class CommonExecAsyncCalc extends ExecNodeBase<RowData> implements SingleTransformationTranslator<RowData> {
    public static final String ASYNC_CALC_TRANSFORMATION = "async-calc";
    public static final String FIELD_NAME_PROJECTION = "projection";

    @JsonProperty("projection")
    private final List<RexNode> projection;

    public CommonExecAsyncCalc(int i, ExecNodeContext execNodeContext, ReadableConfig readableConfig, List<RexNode> list, List<InputProperty> list2, RowType rowType, String str) {
        super(i, execNodeContext, readableConfig, list2, rowType, str);
        Preconditions.checkArgument(list2.size() == 1);
        this.projection = (List) Preconditions.checkNotNull(list);
    }

    @Override // org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase
    protected Transformation<RowData> translateToPlanInternal(PlannerBase plannerBase, ExecNodeConfig execNodeConfig) {
        return createAsyncOneInputTransformation(getInputEdges().get(0).translateToPlan(plannerBase), execNodeConfig, plannerBase.getFlinkContext().getClassLoader());
    }

    private OneInputTransformation<RowData, RowData> createAsyncOneInputTransformation(Transformation<RowData> transformation, ExecNodeConfig execNodeConfig, ClassLoader classLoader) {
        List list = (List) this.projection.stream().filter(rexNode -> {
            return rexNode instanceof RexCall;
        }).map(rexNode2 -> {
            return (RexCall) rexNode2;
        }).collect(Collectors.toList());
        List list2 = (List) this.projection.stream().filter(rexNode3 -> {
            return rexNode3 instanceof RexInputRef;
        }).map(rexNode4 -> {
            return Integer.valueOf(((RexInputRef) rexNode4).getIndex());
        }).collect(Collectors.toList());
        RowType of = RowType.of((LogicalType[]) getInputEdges().get(0).getOutputType().getChildren().toArray(new LogicalType[0]));
        Stream stream = list2.stream();
        List children = of.getChildren();
        Objects.requireNonNull(children);
        List list3 = (List) stream.map((v1) -> {
            return r1.get(v1);
        }).collect(Collectors.toList());
        List list4 = (List) list.stream().map(rexCall -> {
            return FlinkTypeFactory.toLogicalType(rexCall.getType());
        }).collect(Collectors.toList());
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(list3);
        arrayList.addAll(list4);
        InternalTypeInfo ofFields = InternalTypeInfo.ofFields((LogicalType[]) arrayList.toArray(new LogicalType[0]));
        return ExecNodeUtil.createOneInputTransformation((Transformation) transformation, createTransformationMeta(ASYNC_CALC_TRANSFORMATION, execNodeConfig), (StreamOperatorFactory) getAsyncFunctionOperator(execNodeConfig, classLoader, of), (TypeInformation) ofFields, transformation.getParallelism(), false);
    }

    private OneInputStreamOperatorFactory<RowData, RowData> getAsyncFunctionOperator(ExecNodeConfig execNodeConfig, ClassLoader classLoader, RowType rowType) {
        AsyncFunctionRunner asyncFunctionRunner = new AsyncFunctionRunner(AsyncCodeGenerator.generateFunction("AsyncScalarFunction", rowType, RowType.of((LogicalType[]) this.projection.stream().map(rexNode -> {
            return FlinkTypeFactory.toLogicalType(rexNode.getType());
        }).toArray(i -> {
            return new LogicalType[i];
        })), this.projection, true, execNodeConfig, classLoader));
        AsyncUtil.Options asyncOptions = AsyncUtil.getAsyncOptions(execNodeConfig);
        return new AsyncWaitOperatorFactory(asyncFunctionRunner, asyncOptions.asyncTimeout, asyncOptions.asyncBufferCapacity, asyncOptions.asyncOutputMode, asyncOptions.asyncRetryStrategy);
    }
}
