package org.apache.flink.table.planner.plan.nodes.exec.batch;

import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.connector.Projection;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.python.PythonFunctionInfo;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecOverAggregateBase;
import org.apache.flink.table.planner.plan.nodes.exec.spec.OverSpec;
import org.apache.flink.table.planner.plan.nodes.exec.spec.PartitionSpec;
import org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec;
import org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.utils.OverAggregateUtil;
import org.apache.flink.table.runtime.generated.GeneratedProjection;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;

/* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonOverAggregate.class */
public class BatchExecPythonOverAggregate extends BatchExecOverAggregateBase {
    private static final String ARROW_PYTHON_OVER_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME = "org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonOverWindowAggregateFunctionOperator";
    private final List<Long> lowerBoundary;
    private final List<Long> upperBoundary;
    private final List<AggregateCall> aggCalls;
    private final List<Integer> aggWindowIndex;

    public BatchExecPythonOverAggregate(ReadableConfig readableConfig, OverSpec overSpec, InputProperty inputProperty, RowType rowType, String str) {
        super(ExecNodeContext.newNodeId(), ExecNodeContext.newContext(BatchExecPythonOverAggregate.class), ExecNodeContext.newPersistedConfig(BatchExecPythonOverAggregate.class, readableConfig), overSpec, inputProperty, rowType, str);
        this.lowerBoundary = new ArrayList();
        this.upperBoundary = new ArrayList();
        this.aggCalls = new ArrayList();
        this.aggWindowIndex = new ArrayList();
    }

    @Override // org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase
    protected Transformation<RowData> translateToPlanInternal(PlannerBase plannerBase, ExecNodeConfig execNodeConfig) {
        ExecEdge execEdge = getInputEdges().get(0);
        Transformation<?> translateToPlan = execEdge.translateToPlan(plannerBase);
        RowType rowType = (RowType) execEdge.getOutputType();
        List<OverSpec.GroupSpec> groups = this.overSpec.getGroups();
        boolean[] zArr = new boolean[groups.size()];
        for (int i = 0; i < groups.size(); i++) {
            OverSpec.GroupSpec groupSpec = groups.get(i);
            List<AggregateCall> aggCalls = groupSpec.getAggCalls();
            this.aggCalls.addAll(aggCalls);
            for (int i2 = 0; i2 < aggCalls.size(); i2++) {
                this.aggWindowIndex.add(Integer.valueOf(i));
            }
            if (inferGroupMode(groupSpec) == BatchExecOverAggregateBase.OverWindowMode.ROW) {
                zArr[i] = false;
                if (isUnboundedWindow(groupSpec)) {
                    this.lowerBoundary.add(Long.MIN_VALUE);
                    this.upperBoundary.add(Long.MAX_VALUE);
                } else if (isUnboundedPrecedingWindow(groupSpec)) {
                    this.lowerBoundary.add(Long.MIN_VALUE);
                    this.upperBoundary.add(Long.valueOf(OverAggregateUtil.getLongBoundary(this.overSpec, groupSpec.getUpperBound())));
                } else if (isUnboundedFollowingWindow(groupSpec)) {
                    this.lowerBoundary.add(Long.valueOf(OverAggregateUtil.getLongBoundary(this.overSpec, groupSpec.getLowerBound())));
                    this.upperBoundary.add(Long.MAX_VALUE);
                } else {
                    if (!isSlidingWindow(groupSpec)) {
                        throw new TableException("Unsupported row window group spec " + groupSpec);
                    }
                    this.lowerBoundary.add(Long.valueOf(OverAggregateUtil.getLongBoundary(this.overSpec, groupSpec.getLowerBound())));
                    this.upperBoundary.add(Long.valueOf(OverAggregateUtil.getLongBoundary(this.overSpec, groupSpec.getUpperBound())));
                }
            } else {
                zArr[i] = true;
                if (isUnboundedWindow(groupSpec)) {
                    this.lowerBoundary.add(Long.MIN_VALUE);
                    this.upperBoundary.add(Long.MAX_VALUE);
                } else if (isUnboundedPrecedingWindow(groupSpec)) {
                    this.lowerBoundary.add(Long.MIN_VALUE);
                    this.upperBoundary.add(Long.valueOf(OverAggregateUtil.getLongBoundary(this.overSpec, groupSpec.getUpperBound())));
                } else if (isUnboundedFollowingWindow(groupSpec)) {
                    this.lowerBoundary.add(Long.valueOf(OverAggregateUtil.getLongBoundary(this.overSpec, groupSpec.getLowerBound())));
                    this.upperBoundary.add(Long.MAX_VALUE);
                } else {
                    if (!isSlidingWindow(groupSpec)) {
                        throw new TableException("Unsupported range window group spec " + groupSpec);
                    }
                    this.lowerBoundary.add(Long.valueOf(OverAggregateUtil.getLongBoundary(this.overSpec, groupSpec.getLowerBound())));
                    this.upperBoundary.add(Long.valueOf(OverAggregateUtil.getLongBoundary(this.overSpec, groupSpec.getUpperBound())));
                }
            }
        }
        Configuration extractPythonConfiguration = CommonPythonUtil.extractPythonConfiguration(plannerBase.getExecEnv(), execNodeConfig, plannerBase.getFlinkContext().getClassLoader());
        OneInputTransformation<RowData, RowData> createPythonOneInputTransformation = createPythonOneInputTransformation(translateToPlan, rowType, InternalTypeInfo.of(getOutputType()).toRowType(), zArr, extractPythonConfiguration, execNodeConfig, plannerBase.getFlinkContext().getClassLoader());
        if (CommonPythonUtil.isPythonWorkerUsingManagedMemory(extractPythonConfiguration, plannerBase.getFlinkContext().getClassLoader())) {
            createPythonOneInputTransformation.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.PYTHON);
        }
        return createPythonOneInputTransformation;
    }

    private OneInputTransformation<RowData, RowData> createPythonOneInputTransformation(Transformation<RowData> transformation, RowType rowType, RowType rowType2, boolean[] zArr, Configuration configuration, ExecNodeConfig execNodeConfig, ClassLoader classLoader) {
        Tuple2<int[], PythonFunctionInfo[]> extractPythonAggregateFunctionInfosFromAggregateCall = CommonPythonUtil.extractPythonAggregateFunctionInfosFromAggregateCall((AggregateCall[]) this.aggCalls.toArray(new AggregateCall[0]));
        return ExecNodeUtil.createOneInputTransformation((Transformation) transformation, createTransformationName(configuration), createTransformationDescription(configuration), (StreamOperator) getPythonOverWindowAggregateFunctionOperator(execNodeConfig, classLoader, configuration, rowType, rowType2, zArr, (int[]) extractPythonAggregateFunctionInfosFromAggregateCall.f0, (PythonFunctionInfo[]) extractPythonAggregateFunctionInfosFromAggregateCall.f1), (TypeInformation) InternalTypeInfo.of(rowType2), transformation.getParallelism());
    }

    private OneInputStreamOperator<RowData, RowData> getPythonOverWindowAggregateFunctionOperator(ExecNodeConfig execNodeConfig, ClassLoader classLoader, Configuration configuration, RowType rowType, RowType rowType2, boolean[] zArr, int[] iArr, PythonFunctionInfo[] pythonFunctionInfoArr) {
        Class<?> loadClass = CommonPythonUtil.loadClass(ARROW_PYTHON_OVER_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME, classLoader);
        RowType project = Projection.of(iArr).project(rowType);
        RowType project2 = Projection.range(rowType.getFieldCount(), rowType2.getFieldCount()).project(rowType2);
        PartitionSpec partition = this.overSpec.getPartition();
        List<OverSpec.GroupSpec> groups = this.overSpec.getGroups();
        SortSpec sort = groups.get(groups.size() - 1).getSort();
        try {
            return (OneInputStreamOperator) loadClass.getConstructor(Configuration.class, PythonFunctionInfo[].class, RowType.class, RowType.class, RowType.class, long[].class, long[].class, boolean[].class, int[].class, Integer.TYPE, Boolean.TYPE, GeneratedProjection.class, GeneratedProjection.class, GeneratedProjection.class).newInstance(configuration, pythonFunctionInfoArr, rowType, project, project2, this.lowerBoundary.stream().mapToLong(l -> {
                return l.longValue();
            }).toArray(), this.upperBoundary.stream().mapToLong(l2 -> {
                return l2.longValue();
            }).toArray(), zArr, this.aggWindowIndex.stream().mapToInt(num -> {
                return num.intValue();
            }).toArray(), Integer.valueOf(sort.getFieldIndices()[0]), Boolean.valueOf(sort.getAscendingOrders()[0]), ProjectionCodeGenerator.generateProjection(new CodeGeneratorContext(execNodeConfig, classLoader), "UdafInputProjection", rowType, project, iArr), ProjectionCodeGenerator.generateProjection(new CodeGeneratorContext(execNodeConfig, classLoader), "GroupKey", rowType, Projection.of(partition.getFieldIndices()).project(rowType), partition.getFieldIndices()), ProjectionCodeGenerator.generateProjection(new CodeGeneratorContext(execNodeConfig, classLoader), "GroupSet", rowType, Projection.of(partition.getFieldIndices()).project(rowType), partition.getFieldIndices()));
        } catch (IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
            throw new TableException("Python BatchArrowPythonOverWindowAggregateFunctionOperator constructed failed.", e);
        }
    }
}
