/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.runtime.stream.sql;

import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.scala.DataStream;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.bridge.scala.package$;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
import org.apache.flink.table.planner.runtime.utils.TestData$;
import org.apache.flink.table.planner.runtime.utils.TestingAppendSink;
import org.apache.flink.table.planner.runtime.utils.TestingRetractSink;
import org.apache.flink.table.planner.utils.TestLegacyFilterableTableSource$;
import org.apache.flink.table.runtime.functions.scalar.SourceWatermarkFunction;
import org.apache.flink.table.utils.LegacyRowResource;
import org.apache.flink.types.Row;
import org.hamcrest.Matcher;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import scala.Predef$;
import scala.StringContext;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.StringOps;
import scala.math.Ordering;
import scala.reflect.ScalaSignature;

@ScalaSignature(bytes="\u0006\u0001\u00154A!\u0001\u0002\u0001'\t\tB+\u00192mKN{WO]2f\u0013R\u001b\u0015m]3\u000b\u0005\r!\u0011aA:rY*\u0011QAB\u0001\u0007gR\u0014X-Y7\u000b\u0005\u001dA\u0011a\u0002:v]RLW.\u001a\u0006\u0003\u0013)\tq\u0001\u001d7b]:,'O\u0003\u0002\f\u0019\u0005)A/\u00192mK*\u0011QBD\u0001\u0006M2Lgn\u001b\u0006\u0003\u001fA\ta!\u00199bG\",'\"A\t\u0002\u0007=\u0014xm\u0001\u0001\u0014\u0005\u0001!\u0002CA\u000b\u0019\u001b\u00051\"BA\f\u0007\u0003\u0015)H/\u001b7t\u0013\tIbCA\tTiJ,\u0017-\\5oOR+7\u000f\u001e\"bg\u0016DQa\u0007\u0001\u0005\u0002q\ta\u0001P5oSRtD#A\u000f\u0011\u0005y\u0001Q\"\u0001\u0002\t\u000b\u0001\u0002A\u0011A\u0011\u0002\u001dU\u001cXm\u001d'fO\u0006\u001c\u0017PU8xgV\t!\u0005\u0005\u0002$K5\tAE\u0003\u0002\u0018\u0015%\u0011a\u0005\n\u0002\u0012\u0019\u0016<\u0017mY=S_^\u0014Vm]8ve\u000e,\u0007FA\u0010)!\tIC&D\u0001+\u0015\tY\u0003#A\u0003kk:LG/\u0003\u0002.U\t!!+\u001e7f\u0011\u0015y\u0003\u0001\"\u00111\u0003\u0019\u0011WMZ8sKR\t\u0011\u0007\u0005\u00023k5\t1GC\u00015\u0003\u0015\u00198-\u00197b\u0013\t14G\u0001\u0003V]&$\bF\u0001\u00189!\tI\u0013(\u0003\u0002;U\t1!)\u001a4pe\u0016DQ\u0001\u0010\u0001\u0005\u0002A\n\u0011\u0003^3tiNKW\u000e\u001d7f!J|'.Z2uQ\tYd\b\u0005\u0002*\u007f%\u0011\u0001I\u000b\u0002\u0005)\u0016\u001cH\u000fC\u0003C\u0001\u0011\u0005\u0001'\u0001\u000euKN$\bK]8kK\u000e$x+\u001b;i_V$\u0018J\u001c9viJ+g\r\u000b\u0002B}!)Q\t\u0001C\u0001a\u0005\tB/Z:u\u001d\u0016\u001cH/\u001a3Qe>TWm\u0019;)\u0005\u0011s\u0004\"\u0002%\u0001\t\u0003\u0001\u0014!\u0007;fgRtUm\u001d;fIB\u0013xN[3di^KG\u000f[%uK6D#a\u0012 \t\u000b-\u0003A\u0011\u0001\u0019\u0002;Q,7\u000f\u001e+bE2,7k\\;sG\u0016<\u0016\u000e\u001e5GS2$XM]1cY\u0016D#A\u0013 \t\u000b9\u0003A\u0011\u0001\u0019\u0002KQ,7\u000f\u001e+bE2,7k\\;sG\u0016<\u0016\u000e\u001e5Gk:\u001cG/[8o\r&dG/\u001a:bE2,\u0007FA'?\u0011\u0015\t\u0006\u0001\"\u00011\u0003U!Xm\u001d;J]B,HOR8s[\u0006$8k\\;sG\u0016D#\u0001\u0015 \t\u000bQ\u0003A\u0011\u0001\u0019\u0002!Q,7\u000f^!mY\u0012\u000bG/\u0019+za\u0016\u001c\bFA*?\u0011\u00159\u0006\u0001\"\u00011\u0003a!Xm\u001d;TS6\u0004H.Z'fi\u0006$\u0017\r^1BG\u000e,7o\u001d\u0015\u0003-zBQA\u0017\u0001\u0005\u0002A\n\u0011\u0004^3ti\u000e{W\u000e\u001d7fq6+G/\u00193bi\u0006\f5mY3tg\"\u0012\u0011L\u0010\u0005\u0006;\u0002!\t\u0001M\u0001'i\u0016\u001cHOT3ti\u0016$\u0007K]8kK\u000e$\u0018n\u001c8XSRDW*\u001a;bI\u0006$\u0018-Q2dKN\u001c\bF\u0001/?\u0011\u0015\u0001\u0007\u0001\"\u00011\u0003a!Xm\u001d;T_V\u00148-Z,bi\u0016\u0014X.\u0019:l\u0013:$E\t\u0014\u0015\u0003?zBQa\u0019\u0001\u0005\u0002A\n!\u0004^3tiN{WO]2f/\u0006$XM]7be.Le.U;fefD#A\u0019 ")
public class TableSourceITCase
extends StreamingTestBase {
    @Rule
    public LegacyRowResource usesLegacyRows() {
        return LegacyRowResource.INSTANCE;
    }

    @Override
    @Before
    public void before() {
        super.before();
        String myTableDataId = TestValuesTableFactory.registerData(TestData$.MODULE$.smallData3());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\n         |CREATE TABLE MyTable (\n         |  `a` INT,\n         |  `b` BIGINT,\n         |  `c` STRING\n         |) WITH (\n         |  'connector' = 'values',\n         |  'data-id' = '", "',\n         |  'bounded' = 'false'\n         |)\n         |"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{myTableDataId})))).stripMargin());
        String filterableTableDataId = TestValuesTableFactory.registerData(TestLegacyFilterableTableSource$.MODULE$.defaultRows());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\n         |CREATE TABLE FilterableTable (\n         |  name STRING,\n         |  id BIGINT,\n         |  amount INT,\n         |  price DOUBLE\n         |) WITH (\n         |  'connector' = 'values',\n         |  'filterable-fields' = 'amount',\n         |  'data-id' = '", "',\n         |  'bounded' = 'false'\n         |)\n         |"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{filterableTableDataId})))).stripMargin());
        String metadataTableDataId = TestValuesTableFactory.registerData(TestData$.MODULE$.smallData5());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\n         |CREATE TABLE MetadataTable (\n         |  `a` INT,\n         |  `other_metadata` INT METADATA FROM 'metadata_3',\n         |  `b` BIGINT,\n         |  `metadata_1` INT METADATA,\n         |  `computed` AS `metadata_1` * 2,\n         |  `metadata_2` STRING METADATA\n         |) WITH (\n         |  'connector' = 'values',\n         |  'data-id' = '", "',\n         |  'bounded' = 'false',\n         |  'readable-metadata' = 'metadata_1:INT, metadata_2:STRING, metadata_3:BIGINT'\n         |)\n         |"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{metadataTableDataId})))).stripMargin());
        String nestedTableDataId = TestValuesTableFactory.registerData(TestData$.MODULE$.deepNestedRow());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\n         |CREATE TABLE NestedTable (\n         |  id BIGINT,\n         |  deepNested ROW<\n         |     nested1 ROW<name STRING, `value.` INT>,\n         |     `nested2.` ROW<num INT, flag BOOLEAN>>,\n         |  nested ROW<name STRING, `value` INT>,\n         |  name STRING,\n         |  nestedItem ROW<deepArray ROW<`value` INT> ARRAY, deepMap MAP<STRING, INT>>,\n         |  lower_name AS LOWER(name)\n         |) WITH (\n         |  'connector' = 'values',\n         |  'nested-projection-supported' = 'true',\n         |  'data-id' = '", "',\n         |  'bounded' = 'true'\n         |)\n         |"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{nestedTableDataId})))).stripMargin());
    }

    @Test
    public void testSimpleProject() {
        DataStream result = package$.MODULE$.tableConversions(this.tEnv().sqlQuery("SELECT a, c FROM MyTable")).toAppendStream(TypeExtractor.createTypeInfo(Row.class));
        TestingAppendSink sink = new TestingAppendSink();
        result.addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"1,Hi", "2,Hello", "3,Hello world"}));
        Assert.assertEquals((Object)expected.sorted((Ordering)Ordering.String$.MODULE$), (Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testProjectWithoutInputRef() {
        DataStream result = package$.MODULE$.tableConversions(this.tEnv().sqlQuery("SELECT COUNT(*) FROM MyTable")).toRetractStream(TypeExtractor.createTypeInfo(Row.class));
        TestingRetractSink sink = new TestingRetractSink();
        result.addSink((SinkFunction)sink).setParallelism(result.parallelism());
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"3"}));
        Assert.assertEquals((Object)expected.sorted((Ordering)Ordering.String$.MODULE$), (Object)sink.getRetractResults().sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testNestedProject() {
        String query = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT id,\n        |    deepNested.nested1.name AS nestedName,\n        |    nested.`value` AS nestedValue,\n        |    deepNested.`nested2.`.flag AS nestedFlag,\n        |    deepNested.`nested2.`.num + deepNested.nested1.`value.` AS nestedNum,\n        |    lower_name\n        |FROM NestedTable\n      ")).stripMargin();
        DataStream result = package$.MODULE$.tableConversions(this.tEnv().sqlQuery(query)).toAppendStream(TypeExtractor.createTypeInfo(Row.class));
        TestingAppendSink sink = new TestingAppendSink();
        result.addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"1,Sarah,10000,true,1100,mary", "2,Rob,20000,false,2200,bob", "3,Mike,30000,true,3300,liz"}));
        Assert.assertEquals((Object)expected.sorted((Ordering)Ordering.String$.MODULE$), (Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testNestedProjectWithItem() {
        String query = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT nestedItem.deepArray[nestedItem.deepMap['Monday']] FROM  NestedTable\n        |")).stripMargin();
        DataStream result = package$.MODULE$.tableConversions(this.tEnv().sqlQuery(query)).toAppendStream(TypeExtractor.createTypeInfo(Row.class));
        TestingAppendSink sink = new TestingAppendSink();
        result.addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"1", "1", "1"}));
        Assert.assertEquals((Object)expected.sorted((Ordering)Ordering.String$.MODULE$), (Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testTableSourceWithFilterable() {
        String query = "SELECT id, amount, name FROM FilterableTable WHERE amount > 4 AND price < 9";
        DataStream result = package$.MODULE$.tableConversions(this.tEnv().sqlQuery(query)).toAppendStream(TypeExtractor.createTypeInfo(Row.class));
        TestingAppendSink sink = new TestingAppendSink();
        result.addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"5,5,Record_5", "6,6,Record_6", "7,7,Record_7", "8,8,Record_8"}));
        Assert.assertEquals((Object)expected.sorted((Ordering)Ordering.String$.MODULE$), (Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testTableSourceWithFunctionFilterable() {
        String query = "SELECT id, amount, name FROM FilterableTable WHERE amount > 4 AND price < 9 AND upper(name) = 'RECORD_5'";
        DataStream result = package$.MODULE$.tableConversions(this.tEnv().sqlQuery(query)).toAppendStream(TypeExtractor.createTypeInfo(Row.class));
        TestingAppendSink sink = new TestingAppendSink();
        result.addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"5,5,Record_5"}));
        Assert.assertEquals((Object)expected.sorted((Ordering)Ordering.String$.MODULE$), (Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testInputFormatSource() {
        String dataId = TestValuesTableFactory.registerData(TestData$.MODULE$.smallData3());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\n         |CREATE TABLE MyInputFormatTable (\n         |  `a` INT,\n         |  `b` BIGINT,\n         |  `c` STRING\n         |) WITH (\n         |  'connector' = 'values',\n         |  'data-id' = '", "',\n         |  'runtime-source' = 'InputFormat'\n         |)\n         |"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{dataId})))).stripMargin());
        DataStream result = package$.MODULE$.tableConversions(this.tEnv().sqlQuery("SELECT a, c FROM MyInputFormatTable")).toAppendStream(TypeExtractor.createTypeInfo(Row.class));
        TestingAppendSink sink = new TestingAppendSink();
        result.addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"1,Hi", "2,Hello", "3,Hello world"}));
        Assert.assertEquals((Object)expected.sorted((Ordering)Ordering.String$.MODULE$), (Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testAllDataTypes() {
        String dataId = TestValuesTableFactory.registerData(TestData$.MODULE$.fullDataTypesData());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\n         |CREATE TABLE T (\n         |  `a` BOOLEAN,\n         |  `b` TINYINT,\n         |  `c` SMALLINT,\n         |  `d` INT,\n         |  `e` BIGINT,\n         |  `f` FLOAT,\n         |  `g` DOUBLE,\n         |  `h` DECIMAL(5, 2),\n         |  `i` VARCHAR(5),\n         |  `j` CHAR(5),\n         |  `k` DATE,\n         |  `l` TIME(0),\n         |  `m` TIMESTAMP(9),\n         |  `n` TIMESTAMP(9) WITH LOCAL TIME ZONE,\n         |  `o` ARRAY<BIGINT>,\n         |  `p` ROW<f1 BIGINT, f2 STRING, f3 DOUBLE>,\n         |  `q` MAP<STRING, INT>\n         |) WITH (\n         |  'connector' = 'values',\n         |  'data-id' = '", "'\n         |)\n         |"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{dataId})))).stripMargin());
        DataStream result = package$.MODULE$.tableConversions(this.tEnv().sqlQuery("SELECT * FROM T")).toAppendStream(TypeExtractor.createTypeInfo(Row.class));
        TestingAppendSink sink = new TestingAppendSink();
        result.addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"true,127,32767,2147483647,9223372036854775807,-1.123,-1.123,5.10,1,1,1969-01-01,00:00:00.123,1969-01-01T00:00:00.123456789,1969-01-01T00:00:00.123456789Z,[1, 2, 3],1,a,2.3,{k1=1}", "false,-128,-32768,-2147483648,-9223372036854775808,3.4,3.4,6.10,12,12,1970-09-30,01:01:01.123,1970-09-30T01:01:01.123456,1970-09-30T01:01:01.123456Z,[4, 5],null,b,4.56,{k4=4, k2=2}", "true,0,0,0,0,0.12,0.12,7.10,123,123,1990-12-24,08:10:24.123,1990-12-24T08:10:24.123,1990-12-24T08:10:24.123Z,[6, null, 7],3,null,7.86,{k3=null}", "false,5,4,123,1234,1.2345,1.2345,8.12,1234,1234,2020-05-01,23:23:23,2020-05-01T23:23:23,2020-05-01T23:23:23Z,[8],4,c,null,{null=3}", "null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null"}));
        Assert.assertEquals((Object)((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"), (Object)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @Test
    public void testSimpleMetadataAccess() {
        DataStream result = package$.MODULE$.tableConversions(this.tEnv().sqlQuery("SELECT `a`, `b`, `metadata_2` FROM MetadataTable")).toAppendStream(TypeExtractor.createTypeInfo(Row.class));
        TestingAppendSink sink = new TestingAppendSink();
        result.addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"1,1,Hallo", "2,2,Hallo Welt", "2,3,Hallo Welt wie"}));
        Assert.assertEquals((Object)expected.sorted((Ordering)Ordering.String$.MODULE$), (Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testComplexMetadataAccess() {
        DataStream result = package$.MODULE$.tableConversions(this.tEnv().sqlQuery("SELECT `a`, `other_metadata`, `b`, `metadata_2`, `computed` FROM MetadataTable")).toAppendStream(TypeExtractor.createTypeInfo(Row.class));
        TestingAppendSink sink = new TestingAppendSink();
        result.addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"1,1,1,Hallo,0", "2,2,2,Hallo Welt,2", "2,1,3,Hallo Welt wie,4"}));
        Assert.assertEquals((Object)expected.sorted((Ordering)Ordering.String$.MODULE$), (Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testNestedProjectionWithMetadataAccess() {
        String query = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT id,\n        |    deepNested.nested1.name AS nestedName,\n        |    nested.`value` AS nestedValue,\n        |    deepNested.`nested2.`.flag AS nestedFlag,\n        |    deepNested.`nested2.`.num + deepNested.nested1.`value.` AS nestedNum,\n        |    LOWER(name) as lowerName\n        |FROM NestedTable\n      ")).stripMargin();
        DataStream result = package$.MODULE$.tableConversions(this.tEnv().sqlQuery(query)).toAppendStream(TypeExtractor.createTypeInfo(Row.class));
        TestingAppendSink sink = new TestingAppendSink();
        result.addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"1,Sarah,10000,true,1100,mary", "2,Rob,20000,false,2200,bob", "3,Mike,30000,true,3300,liz"}));
        Assert.assertEquals((Object)expected.sorted((Ordering)Ordering.String$.MODULE$), (Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testSourceWatermarkInDDL() {
        String dataId = TestValuesTableFactory.registerData(TestData$.MODULE$.data3WithTimestamp());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\n         |CREATE TABLE tableWithWatermark (\n         |  `a` INT,\n         |  `b` BIGINT,\n         |  `c` STRING,\n         |  `ts` TIMESTAMP(3),\n         |  WATERMARK FOR ts AS SOURCE_WATERMARK()\n         |) WITH (\n         |  'connector' = 'values',\n         |  'data-id' = '", "',\n         |  'bounded' = 'false'\n         |)\n         |"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{dataId})))).stripMargin());
        try {
            this.tEnv().executeSql("SELECT * FROM tableWithWatermark").await();
            Assert.fail((String)"should fail");
        }
        catch (Throwable throwable) {
            Assert.assertThat((Object)throwable, (Matcher)FlinkMatchers.containsCause((Throwable)new TableException(SourceWatermarkFunction.ERROR_MESSAGE)));
        }
    }

    @Test
    public void testSourceWatermarkInQuery() {
        try {
            this.tEnv().executeSql("SELECT *, SOURCE_WATERMARK() FROM MyTable").print();
            Assert.fail((String)"should fail");
        }
        catch (Throwable throwable) {
            Assert.assertThat((Object)throwable, (Matcher)FlinkMatchers.containsCause((Throwable)new TableException(SourceWatermarkFunction.ERROR_MESSAGE)));
        }
    }
}

