package org.apache.flink.table.planner.plan.nodes.exec.operator;

import java.util.Optional;
import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.config.AggregatePhaseStrategy;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestUtil;
import org.apache.flink.table.planner.utils.Top3;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;

/* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/operator/StreamOperatorNameTest.class */
class StreamOperatorNameTest extends OperatorNameTestBase {
    private StreamTableTestUtil util;

    StreamOperatorNameTest() {
    }

    @Override // org.apache.flink.table.planner.plan.nodes.exec.operator.OperatorNameTestBase
    protected TableTestUtil getTableTestUtil() {
        return streamTestUtil(TableConfig.getDefault());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.flink.table.planner.plan.nodes.exec.operator.OperatorNameTestBase
    @BeforeEach
    public void setup() {
        super.setup();
        this.util = (StreamTableTestUtil) super.util;
    }

    @TestTemplate
    void testDropUpdateBefore() {
        this.util.getStreamEnv().setParallelism(2);
        this.tEnv.executeSql("CREATE TABLE MyTable (\n  a bigint,\n  b int not null,\n  c varchar,\n  d bigint not null,\n  primary key(a, b) NOT ENFORCED\n) with (\n  'connector' = 'values',\n  'changelog-mode' = 'I,UA,UB,D',\n  'bounded' = 'false')");
        this.tEnv.executeSql("CREATE TABLE MySink (\n  c varchar,\n  a bigint,\n  b int not null,\n  primary key(a, b) NOT ENFORCED\n) with (\n  'connector' = 'values',\n  'sink-insert-only' = 'false',\n  'sink-changelog-mode-enforced' = 'I,UA,D',  'table-sink-class' = 'DEFAULT')");
        verifyInsert("insert into MySink select c, a, b from MyTable");
    }

    @TestTemplate
    void testChangelogNormalize() throws Exception {
        this.util.getStreamEnv().setParallelism(2);
        this.tEnv.executeSql("CREATE TABLE MyTable (\n  a bigint,\n  b int not null,\n  c varchar,\n  d bigint not null,\n  primary key(a, b) NOT ENFORCED\n) with (\n  'connector' = 'values',\n  'changelog-mode' = 'I,UA,D',\n  'bounded' = 'false')");
        this.tEnv.executeSql("CREATE TABLE MySink (\n  c varchar,\n  a bigint,\n  b int not null,\n  primary key(a) NOT ENFORCED\n) with (\n  'connector' = 'values',\n  'sink-insert-only' = 'false',\n  'sink-changelog-mode-enforced' = 'I,UA,D',  'table-sink-class' = 'DEFAULT')");
        verifyInsert("insert into MySink select c, a, b from MyTable");
    }

    @TestTemplate
    void testDeduplicate() {
        createSourceWithTimeAttribute();
        verifyQuery("SELECT a, b, c FROM (SELECT *,     ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime ASC) AS rk FROM MyTable) t WHERE rk = 1");
    }

    @TestTemplate
    void testIncrementalAggregate() {
        this.util.enableMiniBatch();
        this.tEnv.getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true);
        createTestSource();
        verifyQuery("SELECT a, count(distinct b) as b FROM MyTable GROUP BY a");
    }

    @TestTemplate
    void testGroupAggregate() {
        testGroupAggregateInternal();
    }

    @TestTemplate
    void testTableGroupAggregate() {
        TableTestUtil.createTemporaryView(this.tEnv, "MySource", this.util.getStreamEnv().fromElements(new Integer[]{1, 2, 3, 4, 5}), JavaScalaConversionUtil.toScala(Optional.empty()), JavaScalaConversionUtil.toScala(Optional.empty()), JavaScalaConversionUtil.toScala(Optional.empty()));
        this.tEnv.createTemporaryFunction("top3", new Top3());
        this.tEnv.createTemporaryView("MyTable", this.tEnv.from("MySource").flatAggregate(Expressions.call(Top3.class, new Object[]{Expressions.$("f0")})).select(new Expression[]{Expressions.$("f0"), Expressions.$("f1")}));
        verifyQuery("SELECT * FROM MyTable");
    }

    @TestTemplate
    void testIntervalJoin() {
        createSourceWithTimeAttribute("A");
        createSourceWithTimeAttribute("B");
        verifyQuery("SELECT t1.a, t2.b FROM A t1 JOIN B t2 ON\n    t1.a = t2.a AND \n    t1.proctime BETWEEN t2.proctime - INTERVAL '1' HOUR AND t2.proctime + INTERVAL '1' HOUR");
    }

    @TestTemplate
    void testIntervalJoinNegativeWindow() {
        createSourceWithTimeAttribute("A");
        createSourceWithTimeAttribute("B");
        verifyQuery("SELECT t1.a, t2.b FROM A t1 LEFT JOIN B t2 ON\n    t1.a = t2.a AND \n    t1.proctime BETWEEN t2.proctime + INTERVAL '2' HOUR AND t2.proctime + INTERVAL '1' HOUR");
    }

    @TestTemplate
    void testJoin() {
        testJoinInternal();
    }

    @TestTemplate
    void testMatch() {
        createSourceWithTimeAttribute();
        verifyQuery("SELECT T.aid, T.bid, T.cid\n     FROM MyTable MATCH_RECOGNIZE (\n             ORDER BY proctime\n             MEASURES\n             `A\"`.a AS aid,\n             l.a AS bid,\n             C.a AS cid\n             PATTERN (`A\"` l C)\n             DEFINE\n                 `A\"` AS a = 1,\n                 l AS b = 2,\n                 C AS c = 'c'\n     ) AS T");
    }

    @TestTemplate
    void testTemporalJoin() {
        this.tEnv.executeSql("CREATE TABLE Orders (\n amount INT,\n currency STRING,\n rowtime TIMESTAMP(3),\n proctime AS PROCTIME(),\n WATERMARK FOR rowtime AS rowtime\n) WITH (\n 'connector' = 'values'\n)");
        this.tEnv.executeSql("CREATE TABLE RatesHistory (\n currency STRING,\n rate INT,\n rowtime TIMESTAMP(3),\n WATERMARK FOR rowtime AS rowtime,\n PRIMARY KEY(currency) NOT ENFORCED\n) WITH (\n 'connector' = 'values'\n)");
        this.tEnv.createTemporarySystemFunction("Rates", this.tEnv.from("RatesHistory").createTemporalTableFunction(Expressions.$("rowtime"), Expressions.$("currency")));
        verifyQuery("SELECT amount * r.rate FROM Orders AS o,  LATERAL TABLE (Rates(o.rowtime)) AS r WHERE o.currency = r.currency ");
    }

    @TestTemplate
    void testTemporalSortOnProcTime() {
        createSourceWithTimeAttribute();
        verifyQuery("SELECT a FROM MyTable order by proctime, c");
    }

    @TestTemplate
    void testTemporalSortOnEventTime() {
        createSourceWithTimeAttribute();
        verifyQuery("SELECT a FROM MyTable order by rowtime, c");
    }

    @TestTemplate
    void testWindowAggregate() {
        this.tEnv.getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, AggregatePhaseStrategy.ONE_PHASE);
        createSourceWithTimeAttribute();
        verifyQuery("SELECT\n  b,\n  window_start,\n  window_end,\n  COUNT(*),\n  SUM(a)\nFROM TABLE(\n   TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\nGROUP BY b, window_start, window_end");
    }

    @TestTemplate
    void testLocalGlobalWindowAggregate() {
        createSourceWithTimeAttribute();
        verifyQuery("SELECT\n  b,\n  window_start,\n  window_end,\n  COUNT(*),\n  SUM(a)\nFROM TABLE(\n   TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\nGROUP BY b, window_start, window_end");
    }

    @TestTemplate
    void testWindowJoin() {
        createSourceWithTimeAttribute("MyTable");
        createSourceWithTimeAttribute("MyTable2");
        verifyQuery("select\n  L.a,\n  L.window_start,\n  L.window_end,\n  L.cnt,\n  L.uv,\n  R.a,\n  R.cnt,\n  R.uv\nFROM (\n  SELECT\n    a,\n    window_start,\n    window_end,\n    count(*) as cnt,\n    count(distinct c) AS uv\n  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n  GROUP BY a, window_start, window_end, window_time\n) L\nJOIN (\n  SELECT\n    a,\n    window_start,\n    window_end,\n    count(*) as cnt,\n    count(distinct c) AS uv\n  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n  GROUP BY a, window_start, window_end, window_time\n) R\nON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a");
    }

    @TestTemplate
    void testWindowRank() {
        createSourceWithTimeAttribute();
        verifyQuery("select\n  window_start,\n  window_end,\n  a,\n  b,\n  c\nFROM (\n  SELECT\n    *,\n   ROW_NUMBER() OVER(PARTITION BY a, window_start, window_end ORDER BY b DESC) as rownum\n  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE)))\nWHERE rownum <= 3");
    }

    @TestTemplate
    void testWindowDeduplicate() {
        createSourceWithTimeAttribute();
        verifyQuery("select\n  window_start,\n  window_end,\n  a,\n  b,\n  c\nFROM (\n  SELECT\n    *,\n   ROW_NUMBER() OVER(PARTITION BY a, window_start, window_end ORDER BY rowtime DESC) as rownum\n  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE)))\nWHERE rownum <= 1");
    }
}
