/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.stream.sql;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo;
import org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.package$;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.planner.expressions.utils.Func0$;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.plan.stream.sql.TableScanTest$;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.junit.Test;
import scala.Function1;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.Symbol;
import scala.Symbol$;
import scala.Tuple3;
import scala.collection.Seq;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

@ScalaSignature(bytes="\u0006\u0001\u00055c\u0001B\u0001\u0003\u0001M\u0011Q\u0002V1cY\u0016\u001c6-\u00198UKN$(BA\u0002\u0005\u0003\r\u0019\u0018\u000f\u001c\u0006\u0003\u000b\u0019\taa\u001d;sK\u0006l'BA\u0004\t\u0003\u0011\u0001H.\u00198\u000b\u0005%Q\u0011a\u00029mC:tWM\u001d\u0006\u0003\u00171\tQ\u0001^1cY\u0016T!!\u0004\b\u0002\u000b\u0019d\u0017N\\6\u000b\u0005=\u0001\u0012AB1qC\u000eDWMC\u0001\u0012\u0003\ry'oZ\u0002\u0001'\t\u0001A\u0003\u0005\u0002\u001615\taC\u0003\u0002\u0018\u0011\u0005)Q\u000f^5mg&\u0011\u0011D\u0006\u0002\u000e)\u0006\u0014G.\u001a+fgR\u0014\u0015m]3\t\u000bm\u0001A\u0011\u0001\u000f\u0002\rqJg.\u001b;?)\u0005i\u0002C\u0001\u0010\u0001\u001b\u0005\u0011\u0001b\u0002\u0011\u0001\u0005\u0004%I!I\u0001\u0005kRLG.F\u0001#!\t)2%\u0003\u0002%-\t\u00192\u000b\u001e:fC6$\u0016M\u00197f)\u0016\u001cH/\u0016;jY\"1a\u0005\u0001Q\u0001\n\t\nQ!\u001e;jY\u0002BQ\u0001\u000b\u0001\u0005\u0002%\n\u0011\u0004^3ti2+w-Y2z)\u0006\u0014G.Z*pkJ\u001cWmU2b]R\t!\u0006\u0005\u0002,]5\tAFC\u0001.\u0003\u0015\u00198-\u00197b\u0013\tyCF\u0001\u0003V]&$\bFA\u00142!\t\u0011T'D\u00014\u0015\t!\u0004#A\u0003kk:LG/\u0003\u00027g\t!A+Z:u\u0011\u0015A\u0004\u0001\"\u0001*\u0003I!Xm\u001d;ECR\f7\u000b\u001e:fC6\u001c6-\u00198)\u0005]\n\u0004\"B\u001e\u0001\t\u0003I\u0013\u0001\u0005;fgR$E\t\u0014+bE2,7kY1oQ\tQ\u0014\u0007C\u0003?\u0001\u0011\u0005\u0011&A\ruKN$H\t\u0012'XSRD7i\\7qkR,GmQ8mk6t\u0007FA\u001f2\u0011\u0015\t\u0005\u0001\"\u0001*\u0003e!Xm\u001d;E\t2;\u0016\u000e\u001e5NKR\fG-\u0019;b\u0007>dW/\u001c8)\u0005\u0001\u000b\u0004\"\u0002#\u0001\t\u0003I\u0013a\u000b;fgR$E\tT,ji\"lU\r^1eCR\f7i\u001c7v[:\u0004&o\u001c6fGRLwN\u001c)vg\"$un\u001e8)\u0005\r\u000b\u0004\"B$\u0001\t\u0003I\u0013A\t;fgR$E\tT,ji\"<\u0016\r^3s[\u0006\u00148nQ8naV$X\rZ\"pYVlg\u000e\u000b\u0002Gc!)!\n\u0001C\u0001S\u0005)C/Z:u\t\u0012cu+\u001b;i\u0007>l\u0007/\u001e;fI\u000e{G.^7o%\u00164WM\u001d*poRLW.\u001a\u0015\u0003\u0013FBQ!\u0014\u0001\u0005\u0002%\nq\u0005^3ti.+\u0017p^8sIN<\u0016\u000e\u001e5XCR,'/\\1sW\u000e{W\u000e];uK\u0012\u001cu\u000e\\;n]\"\u0012A*\r\u0005\u0006!\u0002!\t!K\u0001\u0018i\u0016\u001cHoU2b]>s'i\\;oI\u0016$7k\\;sG\u0016D#aT\u0019\t\u000bM\u0003A\u0011A\u0015\u00027Q,7\u000f\u001e$jYR,'o\u00148DQ\u0006tw-\u001a7pON{WO]2fQ\t\u0011\u0016\u0007C\u0003W\u0001\u0011\u0005\u0011&A\ruKN$8kY1o\u001f:\u001c\u0005.\u00198hK2|wmU8ve\u000e,\u0007FA+2\u0011\u0015I\u0006\u0001\"\u0001*\u0003\u0019\"Xm\u001d;V]&|gn\u00115b]\u001e,Gn\\4T_V\u00148-Z!oI\u0006;wM]3hCRLwN\u001c\u0015\u00031FBQ\u0001\u0018\u0001\u0005\u0002%\na\u0004^3ti\u0006;wM]3hCR,wJ\\\"iC:<W\r\\8h'>,(oY3)\u0005m\u000b\u0004\"B0\u0001\t\u0003I\u0013!\u0007;fgRTu.\u001b8P]\u000eC\u0017M\\4fY><7k\\;sG\u0016D#AX\u0019\t\u000b\t\u0004A\u0011A\u0015\u0002YQ,7\u000f\u001e&pS:|en\u00115b]\u001e,Gn\\4T_V\u00148-Z,ji\",e/\u001a8ug\u0012+\b\u000f\\5dCR,\u0007FA12\u0011\u0015)\u0007\u0001\"\u0001*\u0003a!Xm\u001d;K_&twJ\u001c(p+B$\u0017\r^3T_V\u00148-\u001a\u0015\u0003IFBQ\u0001\u001b\u0001\u0005\u0002%\na\u0003^3ti*{\u0017N\\(o+B\u001cXM\u001d;T_V\u00148-\u001a\u0015\u0003OFBQa\u001b\u0001\u0005\n1\f!C^3sS\u001aL(j\\5o\u001f:\u001cv.\u001e:dKR\u0011!&\u001c\u0005\u0006]*\u0004\ra\\\u0001\u000eG\"\fgnZ3m_\u001elu\u000eZ3\u0011\u0005A\u001chBA\u0016r\u0013\t\u0011H&\u0001\u0004Qe\u0016$WMZ\u0005\u0003iV\u0014aa\u0015;sS:<'B\u0001:-\u0011\u00159\b\u0001\"\u0001*\u0003}!Xm\u001d;XCR,'/\\1sW\u0006sGm\u00115b]\u001e,Gn\\4T_V\u00148-\u001a\u0015\u0003mFBQA\u001f\u0001\u0005\u0002%\na\u0005^3ti\u000eC\u0017M\\4fY><7k\\;sG\u0016<\u0016\u000e\u001e5Fm\u0016tGo\u001d#va2L7-\u0019;fQ\tI\u0018\u0007C\u0003~\u0001\u0011\u0005\u0011&\u0001\fuKN$8kY1o\u001f:,\u0006o]3siN{WO]2fQ\ta\u0018\u0007\u0003\u0004\u0002\u0002\u0001!\t!K\u0001/i\u0016\u001cH/\u00169tKJ$8k\\;sG\u0016<\u0016\u000e\u001e5D_6\u0004X\u000f^3e\u0007>dW/\u001c8B]\u0012<\u0016\r^3s[\u0006\u00148\u000e\u000b\u0002\u0000c!1\u0011q\u0001\u0001\u0005\u0002%\nQ\u0005^3tiV\u00038/\u001a:u'>,(oY3XSRDw+\u0019;fe6\f'o\u001b)vg\"$un\u001e8)\u0007\u0005\u0015\u0011\u0007\u0003\u0004\u0002\u000e\u0001!\t!K\u0001$i\u0016\u001cH/\u00168j_:,\u0006o]3siN{WO]2f\u0003:$\u0017iZ4sK\u001e\fG/[8oQ\r\tY!\r\u0005\u0007\u0003'\u0001A\u0011A\u0015\u00027Q,7\u000f^!hOJ,w-\u0019;f\u001f:,\u0006o]3siN{WO]2fQ\r\t\t\"\r\u0005\u0007\u00033\u0001A\u0011A\u0015\u0002KQ,7\u000f^!hOJ,w-\u0019;f\u001f:,\u0006o]3siN{WO]2f!JLW.\u0019:z\u0017\u0016L\bfAA\fc!1\u0011q\u0004\u0001\u0005\u0002%\na\u0005^3tiB\u0013xn\u0019+j[\u0016$V-\u001c9pe\u0006d'j\\5o\u001f:,\u0006o]3siN{WO]2fQ\r\ti\"\r\u0005\u0007\u0003K\u0001A\u0011A\u0015\u0002OQ,7\u000f^#wK:$H+[7f)\u0016l\u0007o\u001c:bY*{\u0017N\\(o+B\u001cXM\u001d;T_V\u00148-\u001a\u0015\u0004\u0003G\t\u0004BBA\u0016\u0001\u0011\u0005\u0011&A\u0018uKN$XK\\:vaB|'\u000f^3e/&tGm\\<BO\u001e\u0014XmZ1uK>s7\t[1oO\u0016dwnZ*pkJ\u001cW\rK\u0002\u0002*EBa!!\r\u0001\t\u0003I\u0013A\b;fgRLeN^1mS\u0012\u001cv.\u001e:dK\u000eC\u0017M\\4fY><Wj\u001c3fQ\r\ty#\r\u0005\u0007\u0003o\u0001A\u0011A\u0015\u0002IQ,7\u000f^'jgNLgn\u001a)sS6\f'/_&fs\u001a{'/\u00169tKJ$8k\\;sG\u0016D3!!\u000e2\u0011\u0019\ti\u0004\u0001C\u0001S\u00059C/Z:u\u001b&\u001c8/\u001b8h!JLW.\u0019:z\u0017\u0016Lhi\u001c:Fm\u0016tGo\u001d#va2L7-\u0019;fQ\r\tY$\r\u0005\u0007\u0003\u0007\u0002A\u0011A\u0015\u0002;Q,7\u000f^%om\u0006d\u0017\u000eZ*dC:|e\u000eT8pWV\u00048k\\;sG\u0016D3!!\u00112\u0011\u0019\tI\u0005\u0001C\u0001S\u0005qB/Z:u\u0013:4\u0018\r\\5e/\u0006$XM]7be.|U\u000f\u001e9viRK\b/\u001a\u0015\u0004\u0003\u000f\n\u0004")
public class TableScanTest
extends TableTestBase {
    private final StreamTableTestUtil util = this.streamTestUtil(this.streamTestUtil$default$1());
    private static Symbol symbol$1 = Symbol$.MODULE$.apply("a");
    private static Symbol symbol$2 = Symbol$.MODULE$.apply("b");
    private static Symbol symbol$3 = Symbol$.MODULE$.apply("c");

    private StreamTableTestUtil util() {
        return this.util;
    }

    @Test
    public void testLegacyTableSourceScan() {
        this.util().addTableSource("MyTable", (Seq<Expression>)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression(symbol$1), package$.MODULE$.symbol2FieldExpression(symbol$2), package$.MODULE$.symbol2FieldExpression(symbol$3)}), new CaseClassTypeInfo<Tuple3<Object, Object, String>>(this){

            public /* synthetic */ TypeInformation[] protected$types($anon$3 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple3<Object, Object, String>> createSerializer(ExecutionConfig executionConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)new Serializable(this, executionConfig, fieldSerializers){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ $anon$3 $outer;
                    private final ExecutionConfig executionConfig$1;
                    private final TypeSerializer[] fieldSerializers$1;

                    public final void apply(int i) {
                        this.apply$mcVI$sp(i);
                    }

                    public void apply$mcVI$sp(int i) {
                        this.fieldSerializers$1[i] = this.$outer.protected$types(this.$outer)[i].createSerializer(this.executionConfig$1);
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                        this.executionConfig$1 = executionConfig$1;
                        this.fieldSerializers$1 = fieldSerializers$1;
                    }
                });
                ScalaCaseClassSerializer<Tuple3<Object, Object, String>> unused = new ScalaCaseClassSerializer<Tuple3<Object, Object, String>>(this, fieldSerializers){

                    public Tuple3<Object, Object, String> createInstance(Object[] fields) {
                        return new Tuple3((Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[0])), (Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[1])), (Object)((String)fields[2]));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }
        });
        this.util().verifyExecPlan("SELECT * FROM MyTable");
    }

    @Test
    public void testDataStreamScan() {
        this.util().addDataStream("DataStreamTable", (Seq<Expression>)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression(symbol$1), package$.MODULE$.symbol2FieldExpression(symbol$2), package$.MODULE$.symbol2FieldExpression(symbol$3)}), new CaseClassTypeInfo<Tuple3<Object, Object, String>>(this){

            public /* synthetic */ TypeInformation[] protected$types($anon$4 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple3<Object, Object, String>> createSerializer(ExecutionConfig executionConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)new Serializable(this, executionConfig, fieldSerializers){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ $anon$4 $outer;
                    private final ExecutionConfig executionConfig$2;
                    private final TypeSerializer[] fieldSerializers$2;

                    public final void apply(int i) {
                        this.apply$mcVI$sp(i);
                    }

                    public void apply$mcVI$sp(int i) {
                        this.fieldSerializers$2[i] = this.$outer.protected$types(this.$outer)[i].createSerializer(this.executionConfig$2);
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                        this.executionConfig$2 = executionConfig$2;
                        this.fieldSerializers$2 = fieldSerializers$2;
                    }
                });
                ScalaCaseClassSerializer<Tuple3<Object, Object, String>> unused = new ScalaCaseClassSerializer<Tuple3<Object, Object, String>>(this, fieldSerializers){

                    public Tuple3<Object, Object, String> createInstance(Object[] fields) {
                        return new Tuple3((Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[0])), (Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[1])), (Object)((String)fields[2]));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }
        });
        this.util().verifyExecPlan("SELECT * FROM DataStreamTable");
    }

    @Test
    public void testDDLTableScan() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TABLE src (\n        |  ts TIMESTAMP(3),\n        |  a INT,\n        |  b DOUBLE,\n        |  WATERMARK FOR ts AS ts - INTERVAL '0.001' SECOND\n        |) WITH (\n        |  'connector' = 'values'\n        |)\n      ")).stripMargin());
        this.util().verifyExecPlan("SELECT * FROM src WHERE a > 1");
    }

    @Test
    public void testDDLWithComputedColumn() {
        this.util().tableEnv().registerFunction("my_udf", (ScalarFunction)Func0$.MODULE$);
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\n         |create table t1(\n         |  a int,\n         |  b varchar,\n         |  c as a + 1,\n         |  d as to_timestamp(b),\n         |  e as my_udf(a)\n         |) with (\n         |  'connector' = 'values'\n         |)\n       "})).s((Seq)Nil$.MODULE$))).stripMargin());
        this.util().verifyExecPlan("SELECT * FROM t1");
    }

    @Test
    public void testDDLWithMetadataColumn() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\n         |CREATE TABLE MetadataTable (\n         |  `a` INT,\n         |  `other_metadata` INT METADATA FROM 'metadata_3' VIRTUAL,\n         |  `b` BIGINT,\n         |  `c` INT,\n         |  `metadata_1` STRING METADATA,\n         |  `computed` AS UPPER(`metadata_1`)\n         |) WITH (\n         |  'connector' = 'values',\n         |  'bounded' = 'false',\n         |  'readable-metadata' = 'metadata_1:STRING, metadata_2:BOOLEAN, metadata_3:BIGINT',\n         |  'writable-metadata' = 'metadata_1:STRING, metadata_2:BOOLEAN'\n         |)\n       "})).s((Seq)Nil$.MODULE$))).stripMargin());
        this.util().verifyExecPlan("SELECT * FROM MetadataTable");
    }

    @Test
    public void testDDLWithMetadataColumnProjectionPushDown() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\n         |CREATE TABLE MetadataTable (\n         |  `a` INT,\n         |  `other_metadata` INT METADATA FROM 'metadata_3' VIRTUAL,\n         |  `b` BIGINT,\n         |  `c` INT,\n         |  `metadata_1` STRING METADATA\n         |) WITH (\n         |  'connector' = 'values',\n         |  'bounded' = 'false',\n         |  'readable-metadata' = 'metadata_1:STRING, metadata_2:BOOLEAN, metadata_3:BIGINT',\n         |  'writable-metadata' = 'metadata_1:STRING, metadata_2:BOOLEAN'\n         |)\n       "})).s((Seq)Nil$.MODULE$))).stripMargin());
        this.util().verifyExecPlan("SELECT `b`, `other_metadata` FROM MetadataTable");
    }

    @Test
    public void testDDLWithWatermarkComputedColumn() {
        this.util().tableEnv().registerFunction("my_udf", (ScalarFunction)Func0$.MODULE$);
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\n         |create table t1(\n         |  a int,\n         |  b varchar,\n         |  c as a + 1,\n         |  d as to_timestamp(b),\n         |  e as my_udf(a),\n         |  WATERMARK FOR d AS d - INTERVAL '0.001' SECOND\n         |) with (\n         |  'connector' = 'values'\n         |)\n       "})).s((Seq)Nil$.MODULE$))).stripMargin());
        this.util().verifyExecPlan("SELECT * FROM t1");
    }

    @Test
    public void testDDLWithComputedColumnReferRowtime() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TABLE src (\n        |  ts TIMESTAMP(3),\n        |  a INT,\n        |  b DOUBLE,\n        |  my_ts AS ts - INTERVAL '0.001' SECOND,\n        |  proc AS PROCTIME(),\n        |  WATERMARK FOR ts AS ts - INTERVAL '0.001' SECOND\n        |) WITH (\n        |  'connector' = 'values'\n        |)\n      ")).stripMargin());
        this.util().verifyExecPlan("SELECT * FROM src WHERE a > 1");
    }

    @Test
    public void testKeywordsWithWatermarkComputedColumn() {
        this.util().tableEnv().registerFunction("my_udf", (ScalarFunction)Func0$.MODULE$);
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\n         |create table t1(\n         |  a int,\n         |  b varchar,\n         |  `time` time,\n         |  mytime as `time`,\n         |  `current_time` as current_time,\n         |  json_row ROW<`timestamp` TIMESTAMP(3)>,\n         |  `timestamp` AS json_row.`timestamp`,\n         |  WATERMARK FOR `timestamp` AS `timestamp`\n         |) with (\n         |  'connector' = 'values'\n         |)\n       "})).s((Seq)Nil$.MODULE$))).stripMargin());
        this.util().verifyExecPlan("SELECT * FROM t1");
    }

    @Test
    public void testScanOnBoundedSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TABLE src (\n        |  ts TIMESTAMP(3),\n        |  a INT,\n        |  b DOUBLE\n        |) WITH (\n        |  'connector' = 'values',\n        |  'bounded' = 'true'\n        |)\n      ")).stripMargin());
        this.util().verifyRelPlan("SELECT * FROM src WHERE a > 1", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testFilterOnChangelogSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TABLE src (\n        |  ts TIMESTAMP(3),\n        |  a INT,\n        |  b DOUBLE\n        |) WITH (\n        |  'connector' = 'values',\n        |  'changelog-mode' = 'I,UA,UB,D'\n        |)\n      ")).stripMargin());
        this.util().verifyRelPlan("SELECT * FROM src WHERE a > 1", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testScanOnChangelogSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TABLE src (\n        |  ts TIMESTAMP(3),\n        |  a INT,\n        |  b DOUBLE\n        |) WITH (\n        |  'connector' = 'values',\n        |  'changelog-mode' = 'I,UA,UB,D'\n        |)\n      ")).stripMargin());
        this.util().verifyRelPlan("SELECT b,a,ts FROM src", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testUnionChangelogSourceAndAggregation() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TABLE changelog_src (\n        |  ts TIMESTAMP(3),\n        |  a INT,\n        |  b DOUBLE\n        |) WITH (\n        |  'connector' = 'values',\n        |  'changelog-mode' = 'I,UA,UB,D'\n        |)\n      ")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TABLE append_src (\n        |  ts TIMESTAMP(3),\n        |  a INT,\n        |  b DOUBLE\n        |) WITH (\n        |  'connector' = 'values',\n        |  'changelog-mode' = 'I'\n        |)\n      ")).stripMargin());
        String query = new StringOps(Predef$.MODULE$.augmentString("\n      |SELECT b, ts, a\n      |FROM (\n      |  SELECT * FROM changelog_src\n      |  UNION ALL\n      |  SELECT MAX(ts) as t, a, MAX(b) as b FROM append_src GROUP BY a\n      |)\n      |")).stripMargin();
        this.util().verifyRelPlan(query, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testAggregateOnChangelogSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TABLE src (\n        |  ts TIMESTAMP(3),\n        |  a INT,\n        |  b DOUBLE\n        |) WITH (\n        |  'connector' = 'values',\n        |  'changelog-mode' = 'I,UA,UB'\n        |)\n      ")).stripMargin());
        this.util().verifyRelPlan("SELECT COUNT(*) FROM src WHERE a > 1", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testJoinOnChangelogSource() {
        this.verifyJoinOnSource("I,UB,UA");
    }

    @Test
    public void testJoinOnChangelogSourceWithEventsDuplicate() {
        this.util().tableEnv().getConfig().getConfiguration().setBoolean(ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE, true);
        this.verifyJoinOnSource("I,UB,UA");
    }

    @Test
    public void testJoinOnNoUpdateSource() {
        this.verifyJoinOnSource("I,D");
    }

    @Test
    public void testJoinOnUpsertSource() {
        this.verifyJoinOnSource("UA,D");
    }

    private void verifyJoinOnSource(String changelogMode) {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TABLE orders (\n        |  amount BIGINT,\n        |  currency_id BIGINT,\n        |  currency_name STRING\n        |) WITH (\n        | 'connector' = 'values',\n        | 'changelog-mode' = 'I'\n        |)\n        |")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\n        |CREATE TABLE rates_history (\n        |  currency_id BIGINT,\n        |  currency_name STRING,\n        |  rate BIGINT,\n        |  PRIMARY KEY (currency_id) NOT ENFORCED\n        |) WITH (\n        |  'connector' = 'values',\n        |  'changelog-mode' = '", "'\n        |)\n      "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{changelogMode})))).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT o.currency_name, o.amount, r.rate, o.amount * r.rate\n        |FROM orders AS o JOIN rates_history AS r\n        |ON o.currency_id = r.currency_id AND o.currency_name = r.currency_name\n        |")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testWatermarkAndChangelogSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TABLE src (\n        |  ts TIMESTAMP(3),\n        |  a INT,\n        |  b DOUBLE,\n        |  WATERMARK FOR `ts` AS `ts` - INTERVAL '5' SECOND\n        |) WITH (\n        |  'connector' = 'values',\n        |  'changelog-mode' = 'I,UB,UA,D'\n        |)\n      ")).stripMargin());
        this.util().verifyRelPlan("SELECT * FROM src WHERE a > 1", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testChangelogSourceWithEventsDuplicate() {
        this.util().tableEnv().getConfig().getConfiguration().setBoolean(ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE, true);
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TABLE src (\n        |  id STRING,\n        |  a INT,\n        |  b AS a + 1,\n        |  c STRING,\n        |  ts as to_timestamp(c),\n        |  PRIMARY KEY (id) NOT ENFORCED,\n        |  WATERMARK FOR ts AS ts - INTERVAL '1' SECOND\n        |) WITH (\n        |  'connector' = 'values',\n        |  'changelog-mode' = 'I,UB,UA,D'\n        |)\n      ")).stripMargin());
        this.util().verifyRelPlan("SELECT a, b, c FROM src WHERE a > 1", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testScanOnUpsertSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TABLE src (\n        |  ts TIMESTAMP(3),\n        |  id1 STRING,\n        |  a INT,\n        |  id2 BIGINT,\n        |  b DOUBLE,\n        |  PRIMARY KEY (id2, id1) NOT ENFORCED\n        |) WITH (\n        |  'connector' = 'values',\n        |  'changelog-mode' = 'UA'\n        |)\n      ")).stripMargin());
        this.util().verifyRelPlan("SELECT id1, a, b FROM src", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testUpsertSourceWithComputedColumnAndWatermark() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TABLE src (\n        |  id STRING,\n        |  a INT,\n        |  b AS a + 1,\n        |  c STRING,\n        |  ts as to_timestamp(c),\n        |  PRIMARY KEY (id) NOT ENFORCED,\n        |  WATERMARK FOR ts AS ts - INTERVAL '1' SECOND\n        |) WITH (\n        |  'connector' = 'values',\n        |  'changelog-mode' = 'UA,D'\n        |)\n      ")).stripMargin());
        this.util().verifyRelPlan("SELECT a, b, c FROM src WHERE a > 1", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testUpsertSourceWithWatermarkPushDown() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TABLE src (\n        |  id STRING,\n        |  a INT,\n        |  b AS a + 1,\n        |  c STRING,\n        |  ts as to_timestamp(c),\n        |  PRIMARY KEY (id) NOT ENFORCED,\n        |  WATERMARK FOR ts AS ts - INTERVAL '1' SECOND\n        |) WITH (\n        |  'connector' = 'values',\n        |  'changelog-mode' = 'UA,D',\n        |  'enable-watermark-push-down' = 'true',\n        |  'disable-lookup' = 'true'\n        |)\n      ")).stripMargin());
        this.util().verifyRelPlan("SELECT id, ts FROM src", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testUnionUpsertSourceAndAggregation() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TABLE upsert_src (\n        |  ts TIMESTAMP(3),\n        |  a INT,\n        |  b DOUBLE,\n        |  PRIMARY KEY (a) NOT ENFORCED\n        |) WITH (\n        |  'connector' = 'values',\n        |  'changelog-mode' = 'UA,D'\n        |)\n      ")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TABLE append_src (\n        |  ts TIMESTAMP(3),\n        |  a INT,\n        |  b DOUBLE\n        |) WITH (\n        |  'connector' = 'values',\n        |  'changelog-mode' = 'I'\n        |)\n      ")).stripMargin());
        String query = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT b, ts, a\n        |FROM (\n        |  SELECT * FROM upsert_src\n        |  UNION ALL\n        |  SELECT MAX(ts) as t, a, MAX(b) as b FROM append_src GROUP BY a\n        |)\n        |")).stripMargin();
        this.util().verifyRelPlan(query, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testAggregateOnUpsertSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TABLE src (\n        |  ts TIMESTAMP(3),\n        |  a INT,\n        |  b DOUBLE,\n        |  c STRING,\n        |  PRIMARY KEY (a) NOT ENFORCED\n        |) WITH (\n        |  'connector' = 'values',\n        |  'changelog-mode' = 'UA,D'\n        |)\n      ")).stripMargin());
        this.util().verifyRelPlan("SELECT b, COUNT(*), MAX(ts), MIN(ts) FROM src GROUP BY b", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testAggregateOnUpsertSourcePrimaryKey() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TABLE src (\n        |  ts TIMESTAMP(3),\n        |  a INT,\n        |  b DOUBLE,\n        |  c STRING,\n        |  PRIMARY KEY (a) NOT ENFORCED\n        |) WITH (\n        |  'connector' = 'values',\n        |  'changelog-mode' = 'UA,D'\n        |)\n      ")).stripMargin());
        this.util().verifyRelPlan("SELECT a, COUNT(*), MAX(ts), MIN(ts) FROM src GROUP BY a", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testProcTimeTemporalJoinOnUpsertSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TABLE orders (\n        |  amount BIGINT,\n        |  currency STRING,\n        |  proctime AS PROCTIME()\n        |) WITH (\n        | 'connector' = 'values',\n        | 'changelog-mode' = 'I'\n        |)\n        |")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TABLE rates_history (\n        |  currency STRING PRIMARY KEY NOT ENFORCED,\n        |  rate BIGINT\n        |) WITH (\n        |  'connector' = 'values',\n        |  'changelog-mode' = 'UA,D',\n        |  'disable-lookup' = 'true'\n        |)\n      ")).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT o.currency, o.amount, r.rate, o.amount * r.rate\n        |FROM orders AS o LEFT JOIN rates_history FOR SYSTEM_TIME AS OF o.proctime AS r\n        |ON o.currency = r.currency\n        |")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testEventTimeTemporalJoinOnUpsertSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TABLE orders (\n        |  amount BIGINT,\n        |  currency STRING,\n        |  rowtime TIMESTAMP(3),\n        |  WATERMARK FOR rowtime AS rowtime\n        |) WITH (\n        | 'connector' = 'values',\n        | 'changelog-mode' = 'I'\n        |)\n        |")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TABLE rates_history (\n        |  currency STRING PRIMARY KEY NOT ENFORCED,\n        |  rate BIGINT,\n        |  rowtime TIMESTAMP(3),\n        |  WATERMARK FOR rowtime AS rowtime\n        |) WITH (\n        |  'connector' = 'values',\n        |  'changelog-mode' = 'UA,D',\n        |  'disable-lookup' = 'true'\n        |)\n      ")).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT o.currency, o.amount, r.rate, o.amount * r.rate\n        |FROM orders AS o LEFT JOIN rates_history FOR SYSTEM_TIME AS OF o.rowtime AS r\n        |ON o.currency = r.currency\n        |")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testUnsupportedWindowAggregateOnChangelogSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TABLE src (\n        |  ts AS PROCTIME(),\n        |  a INT,\n        |  b DOUBLE\n        |) WITH (\n        |  'connector' = 'values',\n        |  'changelog-mode' = 'I,UA,UB'\n        |)\n      ")).stripMargin());
        String query = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT TUMBLE_START(ts, INTERVAL '10' SECOND), COUNT(*)\n        |FROM src\n        |GROUP BY TUMBLE(ts, INTERVAL '10' SECOND)\n        |")).stripMargin();
        this.thrown().expect(TableException.class);
        this.thrown().expectMessage("GroupWindowAggregate doesn't support consuming update changes which is produced by node TableSourceScan");
        this.util().verifyRelPlan(query, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testInvalidSourceChangelogMode() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TABLE src (\n        |  ts TIMESTAMP(3),\n        |  a INT,\n        |  b DOUBLE\n        |) WITH (\n        |  'connector' = 'values',\n        |  'changelog-mode' = 'I,UB,D'\n        |)\n      ")).stripMargin());
        this.thrown().expect(ValidationException.class);
        this.thrown().expectMessage("Invalid source for table 'default_catalog.default_database.src'. A ScanTableSource doesn't support a changelog which contains UPDATE_BEFORE but no UPDATE_AFTER. Please adapt the implementation of class 'org.apache.flink.table.planner.factories.TestValuesTableFactory$TestValuesScanLookupTableSource'.");
        this.util().verifyRelPlan("SELECT * FROM src WHERE a > 1", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testMissingPrimaryKeyForUpsertSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TABLE src (\n        |  ts TIMESTAMP(3),\n        |  a INT,\n        |  b DOUBLE\n        |) WITH (\n        |  'connector' = 'values',\n        |  'changelog-mode' = 'I,UA,D'\n        |)\n      ")).stripMargin());
        this.thrown().expect(TableException.class);
        this.thrown().expectMessage("Table 'default_catalog.default_database.src' produces a changelog stream contains UPDATE_AFTER, no UPDATE_BEFORE. This requires to define primary key constraint on the table.");
        this.util().verifyRelPlan("SELECT * FROM src WHERE a > 1", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testMissingPrimaryKeyForEventsDuplicate() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TABLE src (\n        |  ts TIMESTAMP(3),\n        |  a INT,\n        |  b DOUBLE\n        |) WITH (\n        |  'connector' = 'values',\n        |  'changelog-mode' = 'I,UB,UA,D'\n        |)\n      ")).stripMargin());
        this.util().tableEnv().getConfig().getConfiguration().setBoolean(ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE, true);
        this.thrown().expect(TableException.class);
        this.thrown().expectMessage("Configuration 'table.exec.source.cdc-events-duplicate' is enabled which requires the changelog sources to define a PRIMARY KEY. However, table 'default_catalog.default_database.src' doesn't have a primary key.");
        this.util().verifyRelPlan("SELECT * FROM src WHERE a > 1", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testInvalidScanOnLookupSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\n        |CREATE TABLE src (\n        |  ts TIMESTAMP(3),\n        |  a INT,\n        |  b DOUBLE\n        |) WITH (\n        |  'connector' = 'values',\n        |  'table-source-class' = '", "'\n        |)\n      "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{TestValuesTableFactory.MockedLookupTableSource.class.getName()})))).stripMargin());
        this.thrown().expect(TableException.class);
        this.thrown().expectMessage("Cannot generate a valid execution plan for the given query");
        this.util().verifyRelPlan("SELECT * FROM src", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testInvalidWatermarkOutputType() {
        this.thrown().expect(ValidationException.class);
        this.thrown().expectMessage("Watermark strategy '' must be of type TIMESTAMP or TIMESTAMP_LTZ but is of type 'CHAR(0) NOT NULL'.");
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TABLE src (\n        |  ts TIMESTAMP(3),\n        |  a INT,\n        |  b DOUBLE,\n        |  WATERMARK FOR `ts` AS ''\n        |) WITH (\n        |  'connector' = 'values'\n        |)\n      ")).stripMargin());
    }
}

