package org.apache.flink.table.planner.plan.nodes.exec.stream;

import java.util.ArrayList;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.planner.runtime.utils.InMemoryLookupableTableSource;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.Before;
import org.junit.Test;
import scala.collection.Iterator;
import scala.collection.JavaConverters;

/* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest.class */
public class LookupJoinJsonPlanTest extends TableTestBase {
    private StreamTableTestUtil util;
    private TableEnvironment tEnv;

    @Before
    public void setup() {
        this.util = streamTestUtil(TableConfig.getDefault());
        this.tEnv = this.util.getTableEnv();
        this.tEnv.executeSql("CREATE TABLE MyTable (\n  a int,\n  b varchar,\n  c bigint,\n  proctime as PROCTIME(),\n  rowtime as TO_TIMESTAMP(FROM_UNIXTIME(c)),\n  watermark for rowtime as rowtime - INTERVAL '1' second \n) with (\n  'connector' = 'values',\n  'bounded' = 'false')");
        this.tEnv.executeSql("CREATE TABLE LookupTable (\n  id int,\n  name varchar,\n  age int \n) with (\n  'connector' = 'values',\n  'bounded' = 'false')");
        this.tEnv.executeSql("CREATE TABLE Sink1 (\n  a int,\n  name varchar,  age int) with (\n  'connector' = 'values',\n  'sink-insert-only' = 'false')");
        this.tEnv.executeSql("CREATE TABLE MySink1 (\n  a int,\n  b varchar,  c bigint,  proctime timestamp(3),  rowtime timestamp(3),  id int,  name varchar,  age int) with (\n  'connector' = 'values',\n  'table-sink-class' = 'DEFAULT')");
    }

    @Test
    public void testJoinTemporalTable() {
        this.tEnv.executeSql("CREATE TABLE MySink (\n  a int,\n  b varchar,  c bigint,  proctime timestamp(3),  rowtime timestamp(3),  id int,  name varchar,  age int) with (\n  'connector' = 'values',\n  'table-sink-class' = 'DEFAULT')");
        this.util.verifyJsonPlan("INSERT INTO MySink SELECT * FROM MyTable AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id");
    }

    @Test
    public void testJoinTemporalTableWithProjectionPushDown() {
        this.tEnv.executeSql("CREATE TABLE MySink (\n  a int,\n  b varchar,  c bigint,  proctime timestamp(3),  rowtime timestamp(3),  id int) with (\n  'connector' = 'values',\n  'table-sink-class' = 'DEFAULT')");
        this.util.verifyJsonPlan("INSERT INTO MySink \nSELECT T.*, D.id \nFROM MyTable AS T \nJOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D \nON T.a = D.id\n");
    }

    @Test
    public void testLegacyTableSourceException() {
        InMemoryLookupableTableSource.createTemporaryTable(this.tEnv, false, ((Iterator) JavaConverters.asScalaIteratorConverter(new ArrayList().iterator()).asScala()).toList(), TableSchema.builder().field("id", Types.INT).field("name", Types.STRING).field("age", Types.INT).build(), "LookupTable", true);
        this.tEnv.executeSql("CREATE TABLE MySink (\n  a int,\n  b varchar,  c bigint,  proctime timestamp(3),  rowtime timestamp(3),  id int,  name varchar,  age int) with (\n  'connector' = 'values',\n  'table-sink-class' = 'DEFAULT')");
        Assertions.assertThatThrownBy(() -> {
            this.util.verifyJsonPlan("INSERT INTO MySink SELECT * FROM MyTable AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id");
        }).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(ValidationException.class, "TemporalTableSourceSpec can not be serialized.")});
    }

    @Test
    public void testAggAndLeftJoinWithTryResolveMode() {
        this.tEnv.getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_NONDETERMINISTIC_UPDATE_STRATEGY, OptimizerConfigOptions.NonDeterministicUpdateStrategy.TRY_RESOLVE);
        this.util.verifyJsonPlan("INSERT INTO Sink1 SELECT T.a, D.name, D.age FROM (SELECT max(a) a, count(c) c, PROCTIME() proctime FROM MyTable GROUP BY b) T LEFT JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id");
    }

    @Test
    public void testJoinTemporalTableWithAsyncHint() {
        this.util.verifyJsonPlan("INSERT INTO MySink1 SELECT /*+ LOOKUP('table'='LookupTable', 'async'='true', 'output-mode'='allow_unordered') */ * FROM MyTable AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id");
    }

    @Test
    public void testJoinTemporalTableWithAsyncHint2() {
        this.util.verifyJsonPlan("INSERT INTO MySink1 SELECT /*+ LOOKUP('table'='LookupTable', 'async'='true', 'timeout'='600s', 'capacity'='1000') */ * FROM MyTable AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id");
    }

    @Test
    public void testJoinTemporalTableWithRetryHint() {
        this.util.verifyJsonPlan("INSERT INTO MySink1 SELECT /*+ LOOKUP('table'='LookupTable', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ * FROM MyTable AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id");
    }

    @Test
    public void testJoinTemporalTableWithAsyncRetryHint() {
        this.util.verifyJsonPlan("INSERT INTO MySink1 SELECT /*+ LOOKUP('table'='LookupTable', 'async'='true', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ * FROM MyTable AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id");
    }

    @Test
    public void testJoinTemporalTableWithAsyncRetryHint2() {
        this.util.verifyJsonPlan("INSERT INTO MySink1 SELECT /*+ LOOKUP('table'='LookupTable', 'async'='true', 'timeout'='600s', 'capacity'='1000', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ * FROM MyTable AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id");
    }
}
