package org.apache.flink.table.planner.plan.nodes.exec.stream;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Optional;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlMatchRecognize;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.calcite.tools.RelBuilder;
import org.apache.flink.FlinkVersion;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cep.EventComparator;
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.cep.operator.CepOperator;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.Quantifier;
import org.apache.flink.cep.pattern.conditions.BooleanConditions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.codegen.CodeGenUtils;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
import org.apache.flink.table.planner.codegen.MatchCodeGenerator;
import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.spec.MatchSpec;
import org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.planner.plan.utils.RexDefaultVisitor;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.match.RowDataEventComparator;
import org.apache.flink.table.runtime.operators.sink.StreamRecordTimestampInserter;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
import org.apache.flink.util.MathUtils;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;

@ExecNodeMetadata(name = "stream-exec-match", version = 1, producedTransformations = {"timestamp-inserter", StreamExecMatch.MATCH_TRANSFORMATION}, minPlanVersion = FlinkVersion.v1_15, minStateVersion = FlinkVersion.v1_15)
/* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.class */
public class StreamExecMatch extends ExecNodeBase<RowData> implements StreamExecNode<RowData>, MultipleTransformationTranslator<RowData> {
    public static final String TIMESTAMP_INSERTER_TRANSFORMATION = "timestamp-inserter";
    public static final String MATCH_TRANSFORMATION = "match";
    public static final String FIELD_NAME_MATCH_SPEC = "matchSpec";

    @JsonProperty(FIELD_NAME_MATCH_SPEC)
    private final MatchSpec matchSpec;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch$PatternVisitor.class */
    public static class PatternVisitor extends RexDefaultVisitor<Pattern<RowData, RowData>> {
        private final TableConfig tableConfig;
        private final RelBuilder relBuilder;
        private final RowType inputRowType;
        private final MatchSpec matchSpec;
        private final LinkedHashSet<String> names = new LinkedHashSet<>();
        private Pattern<RowData, RowData> pattern;

        public PatternVisitor(TableConfig tableConfig, RelBuilder relBuilder, RowType rowType, MatchSpec matchSpec) {
            this.tableConfig = tableConfig;
            this.relBuilder = relBuilder;
            this.inputRowType = rowType;
            this.matchSpec = matchSpec;
        }

        @Override // org.apache.flink.table.planner.plan.utils.RexDefaultVisitor, org.apache.calcite.rex.RexVisitor
        public Pattern<RowData, RowData> visitLiteral(RexLiteral rexLiteral) {
            String str = (String) rexLiteral.getValueAs(String.class);
            this.pattern = translateSingleVariable(this.pattern, str);
            RexNode rexNode = this.matchSpec.getPatternDefinitions().get(str);
            if (rexNode == null) {
                return this.pattern.where(BooleanConditions.trueFunction());
            }
            MatchCodeGenerator matchCodeGenerator = new MatchCodeGenerator(new CodeGeneratorContext(this.tableConfig), this.relBuilder, false, JavaScalaConversionUtil.toScala(new ArrayList(this.names)), JavaScalaConversionUtil.toScala(Optional.of(str)), CodeGenUtils.DEFAULT_COLLECTOR_TERM());
            matchCodeGenerator.bindInput(this.inputRowType, CodeGenUtils.DEFAULT_INPUT1_TERM(), JavaScalaConversionUtil.toScala(Optional.empty()));
            return this.pattern.where(matchCodeGenerator.generateIterativeCondition(rexNode));
        }

        @Override // org.apache.flink.table.planner.plan.utils.RexDefaultVisitor, org.apache.calcite.rex.RexVisitor
        /* renamed from: visitCall */
        public Pattern<RowData, RowData> mo4442visitCall(RexCall rexCall) {
            SqlOperator operator = rexCall.getOperator();
            if (operator == SqlStdOperatorTable.PATTERN_CONCAT) {
                this.pattern = (Pattern) rexCall.operands.get(0).accept(this);
                this.pattern = (Pattern) rexCall.operands.get(1).accept(this);
                return this.pattern;
            }
            if (operator == SqlStdOperatorTable.PATTERN_QUANTIFIER) {
                if (!(rexCall.operands.get(0) instanceof RexLiteral)) {
                    throw new TableException(String.format("Expression not supported: %s Group patterns are not supported yet.", rexCall.operands.get(0)));
                }
                this.pattern = (Pattern) ((RexLiteral) rexCall.operands.get(0)).accept(this);
                return applyQuantifier(this.pattern, MathUtils.checkedDownCast(((Long) ((RexLiteral) rexCall.operands.get(1)).getValueAs(Long.class)).longValue()), MathUtils.checkedDownCast(((Long) ((RexLiteral) rexCall.operands.get(2)).getValueAs(Long.class)).longValue()), !((Boolean) ((RexLiteral) rexCall.operands.get(3)).getValueAs(Boolean.class)).booleanValue());
            }
            if (operator == SqlStdOperatorTable.PATTERN_ALTER) {
                throw new TableException(String.format("Expression not supported: %s. Currently, CEP doesn't support branching patterns.", rexCall));
            }
            if (operator == SqlStdOperatorTable.PATTERN_PERMUTE) {
                throw new TableException(String.format("Expression not supported: %s. Currently, CEP doesn't support PERMUTE patterns.", rexCall));
            }
            if (operator == SqlStdOperatorTable.PATTERN_EXCLUDE) {
                throw new TableException(String.format("Expression not supported: %s. Currently, CEP doesn't support '{-' '-}' patterns.", rexCall));
            }
            throw new TableException("This should not happen.");
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.flink.table.planner.plan.utils.RexDefaultVisitor
        /* renamed from: visitNode */
        public Pattern<RowData, RowData> mo5115visitNode(RexNode rexNode) {
            throw new TableException(String.format("Unsupported expression within Pattern: [%s]", rexNode));
        }

        private Pattern<RowData, RowData> translateSingleVariable(Pattern<RowData, RowData> pattern, String str) {
            if (this.names.contains(str)) {
                throw new TableException("Pattern variables must be unique. That might change in the future.");
            }
            this.names.add(str);
            return pattern != null ? pattern.next(str) : Pattern.begin(str, translateSkipStrategy());
        }

        private AfterMatchSkipStrategy translateSkipStrategy() {
            switch (this.matchSpec.getAfter().getKind()) {
                case LITERAL:
                    switch ((SqlMatchRecognize.AfterOption) ((RexLiteral) this.matchSpec.getAfter()).getValueAs(SqlMatchRecognize.AfterOption.class)) {
                        case SKIP_PAST_LAST_ROW:
                            return AfterMatchSkipStrategy.skipPastLastEvent();
                        case SKIP_TO_NEXT_ROW:
                            return AfterMatchSkipStrategy.skipToNext();
                        default:
                            throw new TableException("This should not happen.");
                    }
                case SKIP_TO_FIRST:
                    return AfterMatchSkipStrategy.skipToFirst(getPatternTarget()).throwExceptionOnMiss();
                case SKIP_TO_LAST:
                    return AfterMatchSkipStrategy.skipToLast(getPatternTarget()).throwExceptionOnMiss();
                default:
                    throw new TableException(String.format("Corrupted query tree. Unexpected %s for after match strategy.", this.matchSpec.getAfter()));
            }
        }

        private String getPatternTarget() {
            return (String) ((RexLiteral) ((RexCall) this.matchSpec.getAfter()).getOperands().get(0)).getValueAs(String.class);
        }

        private Pattern<RowData, RowData> applyQuantifier(Pattern<RowData, RowData> pattern, int i, int i2, boolean z) {
            boolean z2 = i == 0 && i2 == 1;
            Pattern<RowData, RowData> consecutive = (i == 0 && i2 == -1) ? pattern.oneOrMore().optional().consecutive() : (i == 1 && i2 == -1) ? pattern.oneOrMore().consecutive() : z2 ? pattern.optional() : i2 != -1 ? pattern.times(i, i2).consecutive() : pattern.timesOrMore(i).consecutive();
            if (z && (z2 || i == i2)) {
                return consecutive;
            }
            if (z) {
                return consecutive.greedy();
            }
            if (z2) {
                throw new TableException("Reluctant optional variables are not supported yet.");
            }
            return consecutive;
        }
    }

    public StreamExecMatch(ReadableConfig readableConfig, MatchSpec matchSpec, InputProperty inputProperty, RowType rowType, String str) {
        this(ExecNodeContext.newNodeId(), ExecNodeContext.newContext(StreamExecMatch.class), ExecNodeContext.newPersistedConfig(StreamExecMatch.class, readableConfig), matchSpec, Collections.singletonList(inputProperty), rowType, str);
    }

    @JsonCreator
    public StreamExecMatch(@JsonProperty("id") int i, @JsonProperty("type") ExecNodeContext execNodeContext, @JsonProperty("configuration") ReadableConfig readableConfig, @JsonProperty("matchSpec") MatchSpec matchSpec, @JsonProperty("inputProperties") List<InputProperty> list, @JsonProperty("outputType") RowType rowType, @JsonProperty("description") String str) {
        super(i, execNodeContext, readableConfig, list, rowType, str);
        Preconditions.checkArgument(list.size() == 1);
        this.matchSpec = (MatchSpec) Preconditions.checkNotNull(matchSpec);
    }

    @Override // org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase
    protected Transformation<RowData> translateToPlanInternal(PlannerBase plannerBase, ExecNodeConfig execNodeConfig) {
        ExecEdge execEdge = getInputEdges().get(0);
        Transformation<?> translateToPlan = execEdge.translateToPlan(plannerBase);
        LogicalType logicalType = (RowType) execEdge.getOutputType();
        checkOrderKeys(logicalType);
        EventComparator<RowData> createEventComparator = createEventComparator(execNodeConfig, logicalType);
        Transformation<RowData> translateOrder = translateOrder(translateToPlan, logicalType, execNodeConfig);
        Tuple2<Pattern<RowData, RowData>, List<String>> translatePattern = translatePattern(this.matchSpec, execNodeConfig.getTableConfig(), plannerBase.getRelBuilder(), logicalType);
        Pattern pattern = (Pattern) translatePattern.f0;
        if (NFACompiler.canProduceEmptyMatches(pattern)) {
            throw new TableException("Patterns that can produce empty matches are not supported. There must be at least one non-optional state.");
        }
        if (pattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY)) {
            throw new TableException("Greedy quantifiers are not allowed as the last element of a Pattern yet. Finish your pattern with either a simple variable or reluctant quantifier.");
        }
        if (this.matchSpec.isAllRows()) {
            throw new TableException("All rows per match mode is not supported yet.");
        }
        int[] fieldIndices = this.matchSpec.getPartition().getFieldIndices();
        boolean isProcTime = TypeCheckUtils.isProcTime(logicalType.getTypeAt(this.matchSpec.getOrderKeys().getFieldSpec(0).getFieldIndex()));
        InternalTypeInfo outputType = translateToPlan.getOutputType();
        TypeSerializer createSerializer = outputType.createSerializer(plannerBase.getExecEnv().getConfig());
        NFACompiler.NFAFactory compileFactory = NFACompiler.compileFactory(pattern, false);
        MatchCodeGenerator matchCodeGenerator = new MatchCodeGenerator(new CodeGeneratorContext(execNodeConfig.getTableConfig()), plannerBase.getRelBuilder(), false, JavaScalaConversionUtil.toScala((List) translatePattern.f1), JavaScalaConversionUtil.toScala(Optional.empty()), CodeGenUtils.DEFAULT_COLLECTOR_TERM());
        matchCodeGenerator.bindInput(logicalType, CodeGenUtils.DEFAULT_INPUT1_TERM(), JavaScalaConversionUtil.toScala(Optional.empty()));
        OneInputTransformation createOneInputTransformation = ExecNodeUtil.createOneInputTransformation((Transformation) translateOrder, createTransformationMeta(MATCH_TRANSFORMATION, execNodeConfig), (StreamOperator) new CepOperator(createSerializer, isProcTime, compileFactory, createEventComparator, pattern.getAfterMatchSkipStrategy(), matchCodeGenerator.generateOneRowPerMatchExpression((RowType) getOutputType(), fieldIndices, this.matchSpec.getMeasures()), (OutputTag) null), (TypeInformation) InternalTypeInfo.of(getOutputType()), translateOrder.getParallelism());
        RowDataKeySelector rowDataSelector = KeySelectorUtil.getRowDataSelector(fieldIndices, outputType);
        createOneInputTransformation.setStateKeySelector(rowDataSelector);
        createOneInputTransformation.setStateKeyType(rowDataSelector.getProducedType());
        if (inputsContainSingleton()) {
            createOneInputTransformation.setParallelism(1);
            createOneInputTransformation.setMaxParallelism(1);
        }
        return createOneInputTransformation;
    }

    private void checkOrderKeys(RowType rowType) {
        SortSpec orderKeys = this.matchSpec.getOrderKeys();
        if (orderKeys.getFieldSize() == 0) {
            throw new TableException("You must specify either rowtime or proctime for order by.");
        }
        LogicalType typeAt = rowType.getTypeAt(orderKeys.getFieldSpec(0).getFieldIndex());
        if (!TypeCheckUtils.isRowTime(typeAt) && !TypeCheckUtils.isProcTime(typeAt)) {
            throw new TableException("You must specify either rowtime or proctime for order by as the first one.");
        }
        if (!orderKeys.getAscendingOrders()[0]) {
            throw new TableException("Primary sort order of a streaming table must be ascending on time.");
        }
    }

    private EventComparator<RowData> createEventComparator(ExecNodeConfig execNodeConfig, RowType rowType) {
        SortSpec orderKeys = this.matchSpec.getOrderKeys();
        if (orderKeys.getFieldIndices().length > 1) {
            return new RowDataEventComparator(ComparatorCodeGenerator.gen(execNodeConfig.getTableConfig(), "RowDataComparator", rowType, orderKeys));
        }
        return null;
    }

    private Transformation<RowData> translateOrder(Transformation<RowData> transformation, RowType rowType, ReadableConfig readableConfig) {
        int fieldIndex = this.matchSpec.getOrderKeys().getFieldSpec(0).getFieldIndex();
        LogicalType typeAt = rowType.getTypeAt(fieldIndex);
        if (!TypeCheckUtils.isRowTime(typeAt)) {
            return transformation;
        }
        OneInputTransformation createOneInputTransformation = ExecNodeUtil.createOneInputTransformation((Transformation) transformation, createTransformationMeta("timestamp-inserter", String.format("StreamRecordTimestampInserter(rowtime field: %s)", Integer.valueOf(fieldIndex)), "StreamRecordTimestampInserter", readableConfig), (StreamOperator) new StreamRecordTimestampInserter(fieldIndex, LogicalTypeChecks.getPrecision(typeAt)), transformation.getOutputType(), transformation.getParallelism());
        if (inputsContainSingleton()) {
            createOneInputTransformation.setParallelism(1);
            createOneInputTransformation.setMaxParallelism(1);
        }
        return createOneInputTransformation;
    }

    @VisibleForTesting
    public static Tuple2<Pattern<RowData, RowData>, List<String>> translatePattern(MatchSpec matchSpec, TableConfig tableConfig, RelBuilder relBuilder, RowType rowType) {
        Pattern pattern;
        PatternVisitor patternVisitor = new PatternVisitor(tableConfig, relBuilder, rowType, matchSpec);
        if (matchSpec.getInterval().isPresent()) {
            pattern = ((Pattern) matchSpec.getPattern().accept(patternVisitor)).within(translateTimeBound(matchSpec.getInterval().get()));
        } else {
            pattern = (Pattern) matchSpec.getPattern().accept(patternVisitor);
        }
        return new Tuple2<>(pattern, new ArrayList(patternVisitor.names));
    }

    private static Time translateTimeBound(RexNode rexNode) {
        if (rexNode instanceof RexLiteral) {
            RexLiteral rexLiteral = (RexLiteral) rexNode;
            if (rexLiteral.getTypeName().getFamily() == SqlTypeFamily.INTERVAL_DAY_TIME) {
                return Time.milliseconds(((Long) rexLiteral.getValueAs(Long.class)).longValue());
            }
        }
        throw new TableException("Only constant intervals with millisecond resolution are supported as time constraints of patterns.");
    }
}
