package org.apache.flink.table.planner.plan.nodes.exec.stream;

import java.util.Collections;
import java.util.List;
import org.apache.flink.FlinkVersion;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecMatch;
import org.apache.flink.table.planner.plan.nodes.exec.spec.MatchSpec;
import org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.runtime.operators.sink.StreamRecordTimestampInserter;
import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;

@ExecNodeMetadata(name = "stream-exec-match", version = 1, producedTransformations = {"timestamp-inserter", CommonExecMatch.MATCH_TRANSFORMATION}, minPlanVersion = FlinkVersion.v1_15, minStateVersion = FlinkVersion.v1_15)
/* loaded from: input_file:flink-table-planner.jar:org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.class */
public class StreamExecMatch extends CommonExecMatch implements StreamExecNode<RowData>, MultipleTransformationTranslator<RowData> {
    public StreamExecMatch(ReadableConfig readableConfig, MatchSpec matchSpec, InputProperty inputProperty, RowType rowType, String str) {
        this(ExecNodeContext.newNodeId(), ExecNodeContext.newContext(StreamExecMatch.class), ExecNodeContext.newPersistedConfig(StreamExecMatch.class, readableConfig), matchSpec, Collections.singletonList(inputProperty), rowType, str);
    }

    @JsonCreator
    public StreamExecMatch(@JsonProperty("id") int i, @JsonProperty("type") ExecNodeContext execNodeContext, @JsonProperty("configuration") ReadableConfig readableConfig, @JsonProperty("matchSpec") MatchSpec matchSpec, @JsonProperty("inputProperties") List<InputProperty> list, @JsonProperty("outputType") RowType rowType, @JsonProperty("description") String str) {
        super(i, execNodeContext, readableConfig, matchSpec, list, rowType, str);
    }

    @Override // org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecMatch
    public void checkOrderKeys(RowType rowType) {
        SortSpec orderKeys = this.matchSpec.getOrderKeys();
        if (orderKeys.getFieldSize() == 0) {
            throw new TableException("You must specify either rowtime or proctime for order by.");
        }
        LogicalType typeAt = rowType.getTypeAt(orderKeys.getFieldSpec(0).getFieldIndex());
        if (!TypeCheckUtils.isRowTime(typeAt) && !TypeCheckUtils.isProcTime(typeAt)) {
            throw new TableException("You must specify either rowtime or proctime for order by as the first one.");
        }
        if (!orderKeys.getAscendingOrders()[0]) {
            throw new TableException("Primary sort order of a streaming table must be ascending on time.");
        }
    }

    @Override // org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecMatch
    public Transformation<RowData> translateOrder(PlannerBase plannerBase, Transformation<RowData> transformation, RowType rowType, ExecEdge execEdge, ExecNodeConfig execNodeConfig) {
        int fieldIndex = this.matchSpec.getOrderKeys().getFieldSpec(0).getFieldIndex();
        LogicalType typeAt = rowType.getTypeAt(fieldIndex);
        if (!TypeCheckUtils.isRowTime(typeAt)) {
            return transformation;
        }
        OneInputTransformation createOneInputTransformation = ExecNodeUtil.createOneInputTransformation((Transformation) transformation, createTransformationMeta("timestamp-inserter", String.format("StreamRecordTimestampInserter(rowtime field: %s)", Integer.valueOf(fieldIndex)), "StreamRecordTimestampInserter", execNodeConfig), (StreamOperator) new StreamRecordTimestampInserter(fieldIndex, LogicalTypeChecks.getPrecision(typeAt)), transformation.getOutputType(), transformation.getParallelism(), false);
        if (inputsContainSingleton()) {
            createOneInputTransformation.setParallelism(1);
            createOneInputTransformation.setMaxParallelism(1);
        }
        return createOneInputTransformation;
    }
}
