package org.apache.flink.table.planner.plan.nodes.exec.batch;

import java.util.Collections;
import java.util.List;
import org.apache.flink.FlinkVersion;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.runtime.operators.sort.SortLimitOperator;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;

@ExecNodeMetadata(name = "batch-exec-sort-limit", version = 1, producedTransformations = {BatchExecSortLimit.SORT_LIMIT_TRANSFORMATION}, minPlanVersion = FlinkVersion.v2_0, minStateVersion = FlinkVersion.v2_0)
/* loaded from: input_file:flink-table-planner.jar:org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortLimit.class */
public class BatchExecSortLimit extends ExecNodeBase<RowData> implements BatchExecNode<RowData>, SingleTransformationTranslator<RowData> {
    public static final String SORT_LIMIT_TRANSFORMATION = "sort-limit";

    @JsonProperty("sortSpec")
    private final SortSpec sortSpec;

    @JsonProperty(BatchExecLimit.FIELD_NAME_LIMIT_START)
    private final long limitStart;

    @JsonProperty(BatchExecLimit.FIELD_NAME_LIMIT_END)
    private final long limitEnd;

    @JsonProperty(BatchExecLimit.FIELD_NAME_IS_GLOBAL)
    private final boolean isGlobal;

    public BatchExecSortLimit(ReadableConfig readableConfig, SortSpec sortSpec, long j, long j2, boolean z, InputProperty inputProperty, RowType rowType, String str) {
        super(ExecNodeContext.newNodeId(), ExecNodeContext.newContext(BatchExecSortLimit.class), ExecNodeContext.newPersistedConfig(BatchExecSortLimit.class, readableConfig), Collections.singletonList(inputProperty), rowType, str);
        this.sortSpec = sortSpec;
        this.limitStart = j;
        this.limitEnd = j2;
        this.isGlobal = z;
    }

    @JsonCreator
    public BatchExecSortLimit(@JsonProperty("id") int i, @JsonProperty("type") ExecNodeContext execNodeContext, @JsonProperty("configuration") ReadableConfig readableConfig, @JsonProperty("sortSpec") SortSpec sortSpec, @JsonProperty("limitStart") long j, @JsonProperty("limitEnd") long j2, @JsonProperty("isGlobal") boolean z, @JsonProperty("inputProperties") List<InputProperty> list, @JsonProperty("outputType") RowType rowType, @JsonProperty("description") String str) {
        super(i, execNodeContext, readableConfig, list, rowType, str);
        this.sortSpec = sortSpec;
        this.limitStart = j;
        this.limitEnd = j2;
        this.isGlobal = z;
    }

    @Override // org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase
    protected Transformation<RowData> translateToPlanInternal(PlannerBase plannerBase, ExecNodeConfig execNodeConfig) {
        if (this.limitEnd == Long.MAX_VALUE) {
            throw new TableException("Not support limitEnd is max value now!");
        }
        ExecEdge execEdge = getInputEdges().get(0);
        Transformation<?> translateToPlan = execEdge.translateToPlan(plannerBase);
        RowType outputType = execEdge.getOutputType();
        return ExecNodeUtil.createOneInputTransformation((Transformation) translateToPlan, createTransformationMeta(SORT_LIMIT_TRANSFORMATION, execNodeConfig), (StreamOperatorFactory) SimpleOperatorFactory.of(new SortLimitOperator(this.isGlobal, this.limitStart, this.limitEnd, ComparatorCodeGenerator.gen(execNodeConfig, plannerBase.getFlinkContext().getClassLoader(), "SortLimitComparator", outputType, this.sortSpec))), (TypeInformation) InternalTypeInfo.of(outputType), translateToPlan.getParallelism(), false);
    }
}
