/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.nodes.exec.stream;

import java.util.ArrayList;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.planner.runtime.utils.InMemoryLookupableTableSource;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.apache.flink.types.Row;
import org.junit.Before;
import org.junit.Test;
import scala.collection.Iterator;
import scala.collection.JavaConverters;
import scala.collection.immutable.List;

public class LookupJoinJsonPlanTest
extends TableTestBase {
    private StreamTableTestUtil util;
    private TableEnvironment tEnv;

    @Before
    public void setup() {
        this.util = this.streamTestUtil(TableConfig.getDefault());
        this.tEnv = this.util.getTableEnv();
        String srcTableA = "CREATE TABLE MyTable (\n  a int,\n  b varchar,\n  c bigint,\n  proctime as PROCTIME(),\n  rowtime as TO_TIMESTAMP(FROM_UNIXTIME(c)),\n  watermark for rowtime as rowtime - INTERVAL '1' second \n) with (\n  'connector' = 'values',\n  'bounded' = 'false')";
        String srcTableB = "CREATE TABLE LookupTable (\n  id int,\n  name varchar,\n  age int \n) with (\n  'connector' = 'values',\n  'bounded' = 'false')";
        this.tEnv.executeSql(srcTableA);
        this.tEnv.executeSql(srcTableB);
    }

    @Test
    public void testJoinTemporalTable() {
        String sinkTableDdl = "CREATE TABLE MySink (\n  a int,\n  b varchar,  c bigint,  proctime timestamp(3),  rowtime timestamp(3),  id int,  name varchar,  age int) with (\n  'connector' = 'values',\n  'table-sink-class' = 'DEFAULT')";
        this.tEnv.executeSql(sinkTableDdl);
        this.util.verifyJsonPlan("INSERT INTO MySink SELECT * FROM MyTable AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id");
    }

    @Test
    public void testJoinTemporalTableWithProjectionPushDown() {
        String sinkTableDdl = "CREATE TABLE MySink (\n  a int,\n  b varchar,  c bigint,  proctime timestamp(3),  rowtime timestamp(3),  id int) with (\n  'connector' = 'values',\n  'table-sink-class' = 'DEFAULT')";
        this.tEnv.executeSql(sinkTableDdl);
        this.util.verifyJsonPlan("INSERT INTO MySink \nSELECT T.*, D.id \nFROM MyTable AS T \nJOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D \nON T.a = D.id\n");
    }

    @Test
    public void testLegacyTableSourceException() {
        this.expectedException().expectMessage("TemporalTableSourceSpec can not be serialized.");
        TableSchema tableSchema = TableSchema.builder().field("id", Types.INT).field("name", Types.STRING).field("age", Types.INT).build();
        InMemoryLookupableTableSource.createTemporaryTable(this.tEnv, false, (List<Row>)((Iterator)JavaConverters.asScalaIteratorConverter(new ArrayList().iterator()).asScala()).toList(), tableSchema, "LookupTable", true);
        String sinkTableDdl = "CREATE TABLE MySink (\n  a int,\n  b varchar,  c bigint,  proctime timestamp(3),  rowtime timestamp(3),  id int,  name varchar,  age int) with (\n  'connector' = 'values',\n  'table-sink-class' = 'DEFAULT')";
        this.tEnv.executeSql(sinkTableDdl);
        this.util.verifyJsonPlan("INSERT INTO MySink SELECT * FROM MyTable AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id");
    }
}

