package org.apache.flink.table.planner.plan.nodes.exec.stream;

import java.time.Duration;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.tools.RelBuilder;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.FieldReferenceExpression;
import org.apache.flink.table.expressions.ValueLiteralExpression;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator;
import org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.expressions.PlannerNamedWindowProperty;
import org.apache.flink.table.planner.expressions.PlannerWindowProperty;
import org.apache.flink.table.planner.plan.logical.LogicalWindow;
import org.apache.flink.table.planner.plan.logical.SessionGroupWindow;
import org.apache.flink.table.planner.plan.logical.SlidingGroupWindow;
import org.apache.flink.table.planner.plan.logical.TumblingGroupWindow;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalWindowJsonDeserializer;
import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalWindowJsonSerializer;
import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
import org.apache.flink.table.planner.plan.utils.AggregateUtil;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.planner.plan.utils.WindowEmitStrategy;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.runtime.generated.GeneratedClass;
import org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
import org.apache.flink.table.runtime.generated.GeneratedNamespaceTableAggsHandleFunction;
import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.window.CountWindow;
import org.apache.flink.table.runtime.operators.window.TimeWindow;
import org.apache.flink.table.runtime.operators.window.WindowOperator;
import org.apache.flink.table.runtime.operators.window.WindowOperatorBuilder;
import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@JsonIgnoreProperties(ignoreUnknown = true)
/* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupWindowAggregate.class */
public class StreamExecGroupWindowAggregate extends StreamExecAggregateBase {
    public static final String FIELD_NAME_WINDOW = "window";
    public static final String FIELD_NAME_NAMED_WINDOW_PROPERTIES = "namedWindowProperties";
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamExecGroupWindowAggregate.class);

    @JsonProperty(StreamExecAggregateBase.FIELD_NAME_GROUPING)
    private final int[] grouping;

    @JsonProperty("aggCalls")
    private final AggregateCall[] aggCalls;

    @JsonSerialize(using = LogicalWindowJsonSerializer.class)
    @JsonDeserialize(using = LogicalWindowJsonDeserializer.class)
    @JsonProperty("window")
    private final LogicalWindow window;

    @JsonProperty("namedWindowProperties")
    private final PlannerNamedWindowProperty[] namedWindowProperties;

    @JsonProperty(StreamExecAggregateBase.FIELD_NAME_NEED_RETRACTION)
    private final boolean needRetraction;

    public StreamExecGroupWindowAggregate(int[] iArr, AggregateCall[] aggregateCallArr, LogicalWindow logicalWindow, PlannerNamedWindowProperty[] plannerNamedWindowPropertyArr, boolean z, InputProperty inputProperty, RowType rowType, String str) {
        this(iArr, aggregateCallArr, logicalWindow, plannerNamedWindowPropertyArr, z, getNewNodeId(), Collections.singletonList(inputProperty), rowType, str);
    }

    @JsonCreator
    public StreamExecGroupWindowAggregate(@JsonProperty("grouping") int[] iArr, @JsonProperty("aggCalls") AggregateCall[] aggregateCallArr, @JsonProperty("window") LogicalWindow logicalWindow, @JsonProperty("namedWindowProperties") PlannerNamedWindowProperty[] plannerNamedWindowPropertyArr, @JsonProperty("needRetraction") boolean z, @JsonProperty("id") int i, @JsonProperty("inputProperties") List<InputProperty> list, @JsonProperty("outputType") RowType rowType, @JsonProperty("description") String str) {
        super(i, list, rowType, str);
        Preconditions.checkArgument(list.size() == 1);
        this.grouping = (int[]) Preconditions.checkNotNull(iArr);
        this.aggCalls = (AggregateCall[]) Preconditions.checkNotNull(aggregateCallArr);
        this.window = (LogicalWindow) Preconditions.checkNotNull(logicalWindow);
        this.namedWindowProperties = (PlannerNamedWindowProperty[]) Preconditions.checkNotNull(plannerNamedWindowPropertyArr);
        this.needRetraction = z;
    }

    @Override // org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase
    protected Transformation<RowData> translateToPlanInternal(PlannerBase plannerBase) {
        int i;
        boolean hasRowIntervalType = this.window instanceof TumblingGroupWindow ? AggregateUtil.hasRowIntervalType(((TumblingGroupWindow) this.window).size()) : this.window instanceof SlidingGroupWindow ? AggregateUtil.hasRowIntervalType(((SlidingGroupWindow) this.window).size()) : false;
        TableConfig tableConfig = plannerBase.getTableConfig();
        if (hasRowIntervalType && this.grouping.length > 0 && tableConfig.getMinIdleStateRetentionTime() < 0) {
            LOGGER.warn("No state retention interval configured for a query which accumulates state. Please provide a query configuration with valid retention interval to prevent excessive state size. You may specify a retention time of 0 to not clean up the state.");
        }
        ExecEdge execEdge = getInputEdges().get(0);
        Transformation<?> translateToPlan = execEdge.translateToPlan(plannerBase);
        RowType rowType = (RowType) execEdge.getOutputType();
        if (AggregateUtil.isRowtimeAttribute(this.window.timeAttribute())) {
            i = AggregateUtil.timeFieldIndex(FlinkTypeFactory.INSTANCE().buildRelNodeRowType(rowType), plannerBase.getRelBuilder(), this.window.timeAttribute());
            if (i < 0) {
                throw new TableException("Group window must defined on a time attribute, but the time attribute can't be found.\nThis should never happen. Please file an issue.");
            }
        } else {
            i = -1;
        }
        ZoneId shiftTimeZone = TimeWindowUtil.getShiftTimeZone(this.window.timeAttribute().getOutputDataType().getLogicalType(), tableConfig);
        boolean[] zArr = new boolean[this.aggCalls.length];
        Arrays.fill(zArr, this.needRetraction);
        AggregateInfoList transformToStreamAggregateInfoList = AggregateUtil.transformToStreamAggregateInfoList(rowType, JavaScalaConversionUtil.toScala(Arrays.asList(this.aggCalls)), zArr, this.needRetraction, true, true);
        GeneratedClass<?> createAggsHandler = createAggsHandler(transformToStreamAggregateInfoList, tableConfig, plannerBase.getRelBuilder(), rowType.getChildren(), shiftTimeZone);
        LogicalType[] extractLogicalTypes = extractLogicalTypes(transformToStreamAggregateInfoList.getActualValueTypes());
        LogicalType[] logicalTypeArr = (LogicalType[]) Arrays.stream(this.namedWindowProperties).map(plannerNamedWindowProperty -> {
            return plannerNamedWindowProperty.getProperty().getResultType();
        }).toArray(i2 -> {
            return new LogicalType[i2];
        });
        OneInputTransformation oneInputTransformation = new OneInputTransformation(translateToPlan, getDescription(), createWindowOperator(plannerBase.getTableConfig(), createAggsHandler, new EqualiserCodeGenerator((LogicalType[]) ArrayUtils.addAll(extractLogicalTypes, logicalTypeArr)).generateRecordEqualiser("WindowValueEqualiser"), extractLogicalTypes(transformToStreamAggregateInfoList.getAccTypes()), logicalTypeArr, extractLogicalTypes(transformToStreamAggregateInfoList.getActualValueTypes()), (LogicalType[]) rowType.getChildren().toArray(new LogicalType[0]), i, shiftTimeZone), InternalTypeInfo.of(getOutputType()), translateToPlan.getParallelism());
        RowDataKeySelector rowDataSelector = KeySelectorUtil.getRowDataSelector(this.grouping, InternalTypeInfo.of(rowType));
        oneInputTransformation.setStateKeySelector(rowDataSelector);
        oneInputTransformation.setStateKeyType(rowDataSelector.mo6135getProducedType());
        return oneInputTransformation;
    }

    private LogicalType[] extractLogicalTypes(DataType[] dataTypeArr) {
        return (LogicalType[]) Arrays.stream(dataTypeArr).map(LogicalTypeDataTypeConverter::fromDataTypeToLogicalType).toArray(i -> {
            return new LogicalType[i];
        });
    }

    private GeneratedClass<?> createAggsHandler(AggregateInfoList aggregateInfoList, TableConfig tableConfig, RelBuilder relBuilder, List<LogicalType> list, ZoneId zoneId) {
        boolean z;
        Object obj;
        if (this.window instanceof SlidingGroupWindow) {
            ValueLiteralExpression size = ((SlidingGroupWindow) this.window).size();
            z = AggregateUtil.hasTimeIntervalType(size);
            obj = AggregateUtil.hasRowIntervalType(size) ? CountWindow.class : TimeWindow.class;
        } else if (this.window instanceof TumblingGroupWindow) {
            z = false;
            obj = AggregateUtil.hasRowIntervalType(((TumblingGroupWindow) this.window).size()) ? CountWindow.class : TimeWindow.class;
        } else {
            if (!(this.window instanceof SessionGroupWindow)) {
                throw new TableException("Unsupported window: " + this.window.toString());
            }
            z = true;
            obj = TimeWindow.class;
        }
        AggsHandlerCodeGenerator needAccumulate = new AggsHandlerCodeGenerator(new CodeGeneratorContext(tableConfig), relBuilder, JavaScalaConversionUtil.toScala(list), false).needAccumulate();
        if (z) {
            needAccumulate.needMerge(0, false, null);
        }
        if (this.needRetraction) {
            needAccumulate.needRetract();
        }
        List asList = Arrays.asList(Arrays.stream(this.namedWindowProperties).map((v0) -> {
            return v0.getProperty();
        }).toArray(i -> {
            return new PlannerWindowProperty[i];
        }));
        return AggregateUtil.isTableAggregate(Arrays.asList(aggregateInfoList.getActualAggregateCalls())) ? needAccumulate.generateNamespaceTableAggsHandler("GroupingWindowTableAggsHandler", aggregateInfoList, JavaScalaConversionUtil.toScala(asList), obj, zoneId) : needAccumulate.generateNamespaceAggsHandler("GroupingWindowAggsHandler", aggregateInfoList, JavaScalaConversionUtil.toScala(asList), (Class) obj, zoneId);
    }

    private WindowOperator<?, ?> createWindowOperator(TableConfig tableConfig, GeneratedClass<?> generatedClass, GeneratedRecordEqualiser generatedRecordEqualiser, LogicalType[] logicalTypeArr, LogicalType[] logicalTypeArr2, LogicalType[] logicalTypeArr3, LogicalType[] logicalTypeArr4, int i, ZoneId zoneId) {
        WindowOperatorBuilder withEventTime;
        WindowOperatorBuilder withShiftTimezone = WindowOperatorBuilder.builder().withInputFields(logicalTypeArr4).withShiftTimezone(zoneId);
        if (this.window instanceof TumblingGroupWindow) {
            TumblingGroupWindow tumblingGroupWindow = (TumblingGroupWindow) this.window;
            FieldReferenceExpression timeField = tumblingGroupWindow.timeField();
            ValueLiteralExpression size = tumblingGroupWindow.size();
            if (AggregateUtil.isProctimeAttribute(timeField) && AggregateUtil.hasTimeIntervalType(size)) {
                withEventTime = withShiftTimezone.tumble(AggregateUtil.toDuration(size)).withProcessingTime();
            } else if (AggregateUtil.isRowtimeAttribute(timeField) && AggregateUtil.hasTimeIntervalType(size)) {
                withEventTime = withShiftTimezone.tumble(AggregateUtil.toDuration(size)).withEventTime(i);
            } else {
                if (!AggregateUtil.isProctimeAttribute(timeField) || !AggregateUtil.hasRowIntervalType(size)) {
                    throw new UnsupportedOperationException("Event-time grouping windows on row intervals are currently not supported.");
                }
                withEventTime = withShiftTimezone.countWindow(AggregateUtil.toLong(size).longValue());
            }
        } else if (this.window instanceof SlidingGroupWindow) {
            SlidingGroupWindow slidingGroupWindow = (SlidingGroupWindow) this.window;
            FieldReferenceExpression timeField2 = slidingGroupWindow.timeField();
            ValueLiteralExpression size2 = slidingGroupWindow.size();
            ValueLiteralExpression slide = slidingGroupWindow.slide();
            if (AggregateUtil.isProctimeAttribute(timeField2) && AggregateUtil.hasTimeIntervalType(size2)) {
                withEventTime = withShiftTimezone.sliding(AggregateUtil.toDuration(size2), AggregateUtil.toDuration(slide)).withProcessingTime();
            } else if (AggregateUtil.isRowtimeAttribute(timeField2) && AggregateUtil.hasTimeIntervalType(size2)) {
                withEventTime = withShiftTimezone.sliding(AggregateUtil.toDuration(size2), AggregateUtil.toDuration(slide)).withEventTime(i);
            } else {
                if (!AggregateUtil.isProctimeAttribute(timeField2) || !AggregateUtil.hasRowIntervalType(size2)) {
                    throw new UnsupportedOperationException("Event-time grouping windows on row intervals are currently not supported.");
                }
                withEventTime = withShiftTimezone.countWindow(AggregateUtil.toLong(size2).longValue(), AggregateUtil.toLong(slide).longValue());
            }
        } else {
            if (!(this.window instanceof SessionGroupWindow)) {
                throw new TableException("Unsupported window: " + this.window.toString());
            }
            SessionGroupWindow sessionGroupWindow = (SessionGroupWindow) this.window;
            FieldReferenceExpression timeField3 = sessionGroupWindow.timeField();
            ValueLiteralExpression gap = sessionGroupWindow.gap();
            if (AggregateUtil.isProctimeAttribute(timeField3)) {
                withEventTime = withShiftTimezone.session(AggregateUtil.toDuration(gap)).withProcessingTime();
            } else {
                if (!AggregateUtil.isRowtimeAttribute(timeField3)) {
                    throw new UnsupportedOperationException("This should not happen.");
                }
                withEventTime = withShiftTimezone.session(AggregateUtil.toDuration(gap)).withEventTime(i);
            }
        }
        WindowEmitStrategy apply = WindowEmitStrategy.apply(tableConfig, this.window);
        if (apply.produceUpdates().booleanValue()) {
            withEventTime.produceUpdates().triggering(apply.getTrigger()).withAllowedLateness(Duration.ofMillis(apply.getAllowLateness().longValue()));
        }
        if (generatedClass instanceof GeneratedNamespaceAggsHandleFunction) {
            return withEventTime.aggregate((GeneratedNamespaceAggsHandleFunction<?>) generatedClass, generatedRecordEqualiser, logicalTypeArr, logicalTypeArr3, logicalTypeArr2).build();
        }
        if (generatedClass instanceof GeneratedNamespaceTableAggsHandleFunction) {
            return withEventTime.aggregate((GeneratedNamespaceTableAggsHandleFunction<?>) generatedClass, logicalTypeArr, logicalTypeArr3, logicalTypeArr2).build();
        }
        throw new TableException("Unsupported agg handler class: " + generatedClass.getClass().getSimpleName());
    }
}
