package org.apache.flink.table.planner.plan.optimize;

import java.util.Arrays;
import java.util.Collection;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.apache.flink.table.planner.utils.TableTestUtil;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.assertj.core.api.Assumptions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith({ParameterizedTestExtension.class})
/* loaded from: input_file:org/apache/flink/table/planner/plan/optimize/ScanReuseTest.class */
class ScanReuseTest extends TableTestBase {
    private final boolean isStreaming;
    private final TableTestUtil util;

    ScanReuseTest(boolean z) {
        this.isStreaming = z;
        TableConfig tableConfig = TableConfig.getDefault();
        this.util = z ? streamTestUtil(tableConfig) : batchTestUtil(tableConfig);
    }

    @Parameters(name = "isStreaming: {0}")
    private static Collection<Boolean> parameters() {
        return Arrays.asList(true, false);
    }

    @BeforeEach
    void before() {
        this.util.tableEnv().executeSql(this.isStreaming ? "CREATE TABLE MyTable (\n  a int,\n  b bigint,\n  c string,\n  nested ROW<i int, j int, s string>,\n  metadata_1 int,\n  compute_metadata as metadata_1 * 2,\n  metadata_2 int,\n  rtime as TO_TIMESTAMP(c, nested.s),\n  WATERMARK FOR rtime AS rtime - INTERVAL '5' SECOND\n) WITH (\n 'connector' = 'values',\n 'bounded' = 'false',\n 'nested-projection-supported' = 'true',\n 'readable-metadata' = 'metadata_1:INT, metadata_2:STRING',\n 'enable-watermark-push-down' = 'true',\n 'disable-lookup' = 'true')" : "CREATE TABLE MyTable (\n  a int,\n  b bigint,\n  c string,\n  nested ROW<i int, j int, s string>,\n  metadata_1 int,\n  compute_metadata as metadata_1 * 2,\n  metadata_2 int\n) PARTITIONED BY (c) WITH (\n 'connector' = 'values',\n 'bounded' = 'true',\n 'nested-projection-supported' = 'true',\n 'readable-metadata' = 'metadata_1:INT, metadata_2:STRING'\n)");
    }

    @TestTemplate
    void testProject() {
        this.util.verifyExecPlan("SELECT T1.a, T1.c, T2.c FROM MyTable T1, MyTable T2 WHERE T1.a = T2.a");
    }

    @TestTemplate
    void testProject1() {
        this.util.verifyExecPlan("SELECT T1.a, T1.b, T1.c, T2.c FROM MyTable T1, MyTable T2 WHERE T1.a = T2.a");
    }

    @TestTemplate
    void testProject2() {
        this.util.verifyExecPlan("SELECT T1.a, T1.b, T2.c FROM MyTable T1, MyTable T2 WHERE T1.a = T2.a");
    }

    @TestTemplate
    void testProjectNested1() {
        this.util.verifyExecPlan("SELECT T1.a, T1.i, T2.j FROM (SELECT a, nested.i as i FROM MyTable) T1, (SELECT a, nested.j as j FROM MyTable) T2 WHERE T1.a = T2.a");
    }

    @TestTemplate
    void testProjectNested2() {
        this.util.verifyExecPlan("SELECT T1.a, T1.i, T2.i FROM (SELECT a, nested.i as i FROM MyTable) T1, (SELECT a, nested.i as i FROM MyTable) T2 WHERE T1.a = T2.a");
    }

    @TestTemplate
    void testProjectNestedWithWholeField() {
        this.util.verifyExecPlan("SELECT * FROM (SELECT a, nested.i FROM MyTable) T1, (SELECT a, nested FROM MyTable) T2 WHERE T1.a = T2.a");
    }

    @TestTemplate
    void testProjectWithExpr() {
        this.util.verifyExecPlan("SELECT T1.a, T1.b, T2.c FROM (SELECT a, b + 1 as b FROM MyTable) T1, MyTable T2 WHERE T1.a = T2.a");
    }

    @TestTemplate
    void testProjectWithFilter() {
        this.util.verifyExecPlan("SELECT T1.a, T1.b, T2.c FROM (SELECT * FROM MyTable WHERE b = 2) T1, (SELECT * FROM MyTable WHERE b = 3) T2 WHERE T1.a = T2.a");
    }

    @TestTemplate
    void testProjectWithMeta1() {
        this.util.verifyExecPlan("SELECT T1.a, T1.b, T1.metadata_1, T1.metadata_2, T2.c, T2.metadata_2 FROM MyTable T1, MyTable T2 WHERE T1.a = T2.a");
    }

    @TestTemplate
    void testProjectWithMeta2() {
        this.util.verifyExecPlan("SELECT T1.a, T1.b, T1.metadata_1, T2.c, T2.metadata_2 FROM MyTable T1, MyTable T2 WHERE T1.a = T2.a");
    }

    @TestTemplate
    void testProjectWithMeta3() {
        this.util.verifyExecPlan("SELECT T1.a, T1.b, T1.metadata_1, T2.c, T2.metadata_1 FROM MyTable T1, MyTable T2 WHERE T1.a = T2.a");
    }

    @TestTemplate
    void testProjectWithMetaAndCompute() {
        this.util.verifyExecPlan("SELECT T1.a, T1.b, T1.metadata_1, T1.compute_metadata, T2.c, T2.metadata_2 FROM MyTable T1, MyTable T2 WHERE T1.a = T2.a");
    }

    @TestTemplate
    void testProjectWithHints() {
        this.util.verifyExecPlan("SELECT T1.a, T1.c, T2.c FROM MyTable /*+ OPTIONS('source.num-element-to-skip'='1') */ T1, MyTable T2 WHERE T1.a = T2.a");
    }

    @TestTemplate
    void testProjectReuseWithHints() {
        this.util.verifyExecPlan("SELECT T1.a, T1.c, T2.c FROM MyTable /*+ OPTIONS('source.num-element-to-skip'='1') */ T1, MyTable /*+ OPTIONS('source.num-element-to-skip'='1') */ T2 WHERE T1.a = T2.a");
    }

    @TestTemplate
    void testProjectWithDifferentHints() {
        this.util.verifyExecPlan("SELECT T1.a, T1.c, T2.c FROM MyTable /*+ OPTIONS('source.num-element-to-skip'='1') */ T1, MyTable /*+ OPTIONS('source.num-element-to-skip'='10') */ T2 WHERE T1.a = T2.a");
    }

    @TestTemplate
    void testProjectWithFilterPushDown() {
        this.util.verifyExecPlan("SELECT T1.a, T1.c, T2.c FROM (SELECT * FROM MyTable /*+ OPTIONS('filterable-fields'='b') */ WHERE b = 2) T1, (SELECT * FROM MyTable /*+ OPTIONS('filterable-fields'='b') */ WHERE b = 1) T2 WHERE T1.a = T2.a");
    }

    @TestTemplate
    void testProjectReuseWithFilterPushDown() {
        this.util.verifyExecPlan("SELECT T1.a, T1.c, T2.c FROM (SELECT * FROM MyTable /*+ OPTIONS('filterable-fields'='b') */ WHERE b = 1) T1, (SELECT * FROM MyTable /*+ OPTIONS('filterable-fields'='b') */ WHERE b = 1) T2 WHERE T1.a = T2.a");
    }

    @TestTemplate
    void testProjectReuseWithWatermark() {
        if (this.isStreaming) {
            this.util.tableEnv().executeSql("CREATE TABLE W_T (\n  a int,\n  b bigint,\n  c string,\n  rtime timestamp(3),\n  WATERMARK FOR rtime AS rtime - INTERVAL '5' SECOND\n) WITH (\n 'connector' = 'values',\n 'bounded' = 'false',\n 'enable-watermark-push-down' = 'true',\n 'disable-lookup' = 'true')");
            this.util.verifyExecPlan("SELECT T1.a, T1.c, T2.b FROM (SELECT MIN(a) as a, MIN(c) as c FROM W_T GROUP BY TUMBLE(rtime, INTERVAL '10' SECOND)) T1, (SELECT MIN(a) as a, MIN(b) as b FROM W_T GROUP BY TUMBLE(rtime, INTERVAL '10' SECOND)) T2 WHERE T1.a = T2.a");
        }
    }

    @TestTemplate
    void testProjectWithLimitPushDown() {
        this.util.verifyExecPlan("SELECT T1.a, T1.c, T2.c FROM (SELECT * FROM MyTable LIMIT 11) T1, (SELECT * FROM MyTable LIMIT 10) T2 WHERE T1.a = T2.a");
    }

    @TestTemplate
    void testProjectReuseWithLimitPushDown() {
        this.util.verifyExecPlan("SELECT T1.a, T1.c, T2.c FROM (SELECT * FROM MyTable LIMIT 10) T1, (SELECT * FROM MyTable LIMIT 10) T2 WHERE T1.a = T2.a");
    }

    @TestTemplate
    void testProjectWithPartitionPushDown() {
        if (this.isStreaming) {
            return;
        }
        this.util.verifyExecPlan("SELECT T1.a, T1.c, T2.c FROM (SELECT * FROM MyTable /*+ OPTIONS('partition-list'='c:1;c:2') */ WHERE c = '1') T1, (SELECT * FROM MyTable /*+ OPTIONS('partition-list'='c:1;c:2') */ WHERE c = '2') T2 WHERE T1.a = T2.a");
    }

    @TestTemplate
    void testProjectReuseWithPartitionPushDown() {
        if (this.isStreaming) {
            return;
        }
        this.util.verifyExecPlan("SELECT T1.a, T1.c, T2.c FROM (SELECT * FROM MyTable /*+ OPTIONS('partition-list'='c:1;c:2') */ WHERE c = '1') T1, (SELECT * FROM MyTable /*+ OPTIONS('partition-list'='c:1;c:2') */ WHERE c = '1') T2 WHERE T1.a = T2.a");
    }

    @TestTemplate
    void testReuseWithReadMetadataAndWatermarkPushDown1() {
        Assumptions.assumeThat(this.isStreaming).isTrue();
        this.util.tableEnv().executeSql("CREATE TABLE MyTable1 (\n  metadata_0 int METADATA VIRTUAL,\n  a0 int,\n  a1 int,\n  a2 int,\n  ts STRING,\n   rowtime as TO_TIMESTAMP(`ts`),\n  WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND\n) WITH (\n 'connector' = 'values',\n 'bounded' = 'false',\n 'readable-metadata' = 'metadata_0:int',\n 'enable-watermark-push-down' = 'true',\n 'disable-lookup' = 'true')");
        this.util.verifyExecPlan("SELECT T1.a1, T1.a2 FROM (SELECT a0, window_start, window_end, MIN(a1) as a1, MIN(a2) as a2, MIN(metadata_0) as metadata_0 FROM TABLE(   TUMBLE(TABLE MyTable1, DESCRIPTOR(rowtime), INTERVAL '1' SECOND))  GROUP BY a0, window_start, window_end) T1, (SELECT a0, window_start, window_end, MIN(a1) as a1  FROM TABLE(   TUMBLE(TABLE MyTable1, DESCRIPTOR(rowtime), INTERVAL '1' SECOND))  GROUP BY a0, window_start, window_end) T2 WHERE T1.a1 = T2.a1");
    }

    @TestTemplate
    void testReuseWithReadMetadataAndWatermarkPushDown2() {
        Assumptions.assumeThat(this.isStreaming).isTrue();
        this.util.tableEnv().executeSql("CREATE TABLE MyTable1 (\n  metadata_0 int METADATA VIRTUAL,\n  a0 int,\n  a1 int,\n  a2 int,\n  ts STRING,\n   rowtime as TO_TIMESTAMP(`ts`),\n  WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND\n) WITH (\n 'connector' = 'values',\n 'bounded' = 'false',\n 'readable-metadata' = 'metadata_0:int',\n 'enable-watermark-push-down' = 'true',\n 'disable-lookup' = 'true')");
        this.util.verifyExecPlan("SELECT T1.a1, T2.a2 FROM (SELECT a0, window_start, window_end, MIN(a1) as a1  FROM TABLE(   TUMBLE(TABLE MyTable1, DESCRIPTOR(rowtime), INTERVAL '1' SECOND))  GROUP BY a0, window_start, window_end) T1, (SELECT a0, window_start, window_end, MIN(a1) as a1, MIN(a2) as a2, MIN(metadata_0) as metadata_0 FROM TABLE(   TUMBLE(TABLE MyTable1, DESCRIPTOR(rowtime), INTERVAL '1' SECOND))  GROUP BY a0, window_start, window_end) T2 WHERE T1.a1 = T2.a1");
    }

    @TestTemplate
    void testReuseWithReadMetadataKeepOrder() {
        Assumptions.assumeThat(this.isStreaming).isTrue();
        this.util.tableEnv().executeSql("CREATE TEMPORARY TABLE src(    `origin_ts` TIMESTAMP(3) METADATA VIRTUAL,    `partition` INT METADATA VIRTUAL,    `offset` BIGINT METADATA VIRTUAL,    `id` BIGINT,   PRIMARY KEY (`id`) NOT ENFORCED ) WITH (    'connector' = 'values',    'readable-metadata' = 'offset:bigint,origin_ts:timestamp(3),partition:int')");
        this.util.tableEnv().executeSql("CREATE TEMPORARY TABLE snk1(    `origin_ts` TIMESTAMP(3),    `partition` INT,    `offset` BIGINT,    `id` BIGINT) WITH (    'connector' = 'values' )");
        this.util.tableEnv().executeSql("CREATE TEMPORARY TABLE snk2(    `id` BIGINT) WITH (    'connector' = 'values' )");
        StatementSet createStatementSet = this.util.tableEnv().createStatementSet();
        createStatementSet.addInsertSql("INSERT INTO snk1 select `origin_ts`, `partition`, `offset`, `id` from src");
        createStatementSet.addInsertSql("INSERT INTO snk2 select `id` from src");
        this.util.verifyExecPlan(createStatementSet);
    }
}
