package org.apache.flink.table.planner.plan.nodes.exec.stream;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.flink.FlinkVersion;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
import org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.utils.AggregateUtil;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.planner.plan.utils.MinibatchUtil;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.runtime.operators.aggregate.MiniBatchLocalGroupAggFunction;
import org.apache.flink.table.runtime.operators.bundle.MapBundleOperator;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

@ExecNodeMetadata(name = "stream-exec-local-group-aggregate", version = 1, consumedOptions = {"table.exec.mini-batch.enabled", "table.exec.mini-batch.size"}, producedTransformations = {StreamExecLocalGroupAggregate.LOCAL_GROUP_AGGREGATE_TRANSFORMATION}, minPlanVersion = FlinkVersion.v1_15, minStateVersion = FlinkVersion.v1_15)
/* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalGroupAggregate.class */
public class StreamExecLocalGroupAggregate extends StreamExecAggregateBase {
    public static final String LOCAL_GROUP_AGGREGATE_TRANSFORMATION = "local-group-aggregate";

    @JsonProperty("grouping")
    private final int[] grouping;

    @JsonProperty("aggCalls")
    private final AggregateCall[] aggCalls;

    @JsonProperty(StreamExecAggregateBase.FIELD_NAME_AGG_CALL_NEED_RETRACTIONS)
    private final boolean[] aggCallNeedRetractions;

    @JsonProperty(StreamExecAggregateBase.FIELD_NAME_NEED_RETRACTION)
    private final boolean needRetraction;

    public StreamExecLocalGroupAggregate(ReadableConfig readableConfig, int[] iArr, AggregateCall[] aggregateCallArr, boolean[] zArr, boolean z, InputProperty inputProperty, RowType rowType, String str) {
        this(ExecNodeContext.newNodeId(), ExecNodeContext.newContext(StreamExecLocalGroupAggregate.class), ExecNodeContext.newPersistedConfig(StreamExecLocalGroupAggregate.class, readableConfig), iArr, aggregateCallArr, zArr, z, Collections.singletonList(inputProperty), rowType, str);
    }

    @JsonCreator
    public StreamExecLocalGroupAggregate(@JsonProperty("id") int i, @JsonProperty("type") ExecNodeContext execNodeContext, @JsonProperty("configuration") ReadableConfig readableConfig, @JsonProperty("grouping") int[] iArr, @JsonProperty("aggCalls") AggregateCall[] aggregateCallArr, @JsonProperty("aggCallNeedRetractions") boolean[] zArr, @JsonProperty("needRetraction") boolean z, @JsonProperty("inputProperties") List<InputProperty> list, @JsonProperty("outputType") RowType rowType, @JsonProperty("description") String str) {
        super(i, execNodeContext, readableConfig, list, rowType, str);
        this.grouping = (int[]) Preconditions.checkNotNull(iArr);
        this.aggCalls = (AggregateCall[]) Preconditions.checkNotNull(aggregateCallArr);
        this.aggCallNeedRetractions = (boolean[]) Preconditions.checkNotNull(zArr);
        Preconditions.checkArgument(aggregateCallArr.length == zArr.length);
        this.needRetraction = z;
    }

    @Override // org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase
    protected Transformation<RowData> translateToPlanInternal(PlannerBase plannerBase, ExecNodeConfig execNodeConfig) {
        ExecEdge execEdge = getInputEdges().get(0);
        Transformation<?> translateToPlan = execEdge.translateToPlan(plannerBase);
        RowType outputType = execEdge.getOutputType();
        AggsHandlerCodeGenerator aggsHandlerCodeGenerator = new AggsHandlerCodeGenerator(new CodeGeneratorContext(execNodeConfig, plannerBase.getFlinkContext().getClassLoader()), plannerBase.createRelBuilder(), JavaScalaConversionUtil.toScala(outputType.getChildren()), true);
        aggsHandlerCodeGenerator.needAccumulate().needMerge(0, true, null);
        if (this.needRetraction) {
            aggsHandlerCodeGenerator.needRetract();
        }
        return ExecNodeUtil.createOneInputTransformation((Transformation) translateToPlan, createTransformationMeta(LOCAL_GROUP_AGGREGATE_TRANSFORMATION, execNodeConfig), (StreamOperator) new MapBundleOperator(new MiniBatchLocalGroupAggFunction(aggsHandlerCodeGenerator.generateAggsHandler("GroupAggsHandler", AggregateUtil.transformToStreamAggregateInfoList(plannerBase.getTypeFactory(), outputType, JavaScalaConversionUtil.toScala(Arrays.asList(this.aggCalls)), this.aggCallNeedRetractions, this.needRetraction, false, true))), MinibatchUtil.createMiniBatchTrigger(execNodeConfig), KeySelectorUtil.getRowDataSelector(plannerBase.getFlinkContext().getClassLoader(), this.grouping, translateToPlan.getOutputType())), (TypeInformation) InternalTypeInfo.of(getOutputType()), translateToPlan.getParallelism(), false);
    }
}
