package org.apache.flink.table.planner.runtime.stream.sql;

import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.sql.Timestamp;
import java.time.DayOfWeek;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.annotation.HintFlag;
import org.apache.flink.table.annotation.InputGroup;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.dataview.MapView;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.SpecializedFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.TypeInference;
import org.apache.flink.table.types.inference.TypeStrategies;
import org.apache.flink.table.types.logical.RawType;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.StringUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;

/* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.class */
public class FunctionITCase extends StreamingTestBase {
    private static final String TEST_FUNCTION = TestUDF.class.getName();

    /* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase$BoolToInt.class */
    public static class BoolToInt extends ScalarFunction {
        public int eval(boolean z) {
            return z ? 1 : 0;
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase$ClassNameOrUnknownScalarFunction.class */
    public static class ClassNameOrUnknownScalarFunction extends ClassNameScalarFunction {
        public String eval(@DataTypeHint("NULL") Object obj) {
            return "<<unknown>>";
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase$ClassNameScalarFunction.class */
    public static class ClassNameScalarFunction extends ScalarFunction {
        public String eval(Integer num) {
            return "Integer";
        }

        public String eval(Boolean bool) {
            return "Boolean";
        }

        public String eval(String str) {
            return "String";
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase$ComplexScalarFunction.class */
    public static class ComplexScalarFunction extends ScalarFunction {
        public String eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object obj, Timestamp timestamp) {
            return StringUtils.arrayAwareToString(obj) + "+" + timestamp.toString();
        }

        @DataTypeHint("DECIMAL(5, 2)")
        public BigDecimal eval() {
            return new BigDecimal("123.4");
        }

        @DataTypeHint("RAW")
        public ByteBuffer eval(byte[] bArr) {
            if (bArr == null) {
                return null;
            }
            return ByteBuffer.wrap(bArr);
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase$CustomScalarFunction.class */
    public static class CustomScalarFunction extends ScalarFunction {
        public Integer eval(Integer... numArr) {
            for (Integer num : numArr) {
                if (num != null) {
                    return num;
                }
            }
            return null;
        }

        public TypeInference getTypeInference(DataTypeFactory dataTypeFactory) {
            return TypeInference.newBuilder().outputTypeStrategy(TypeStrategies.argument(0)).build();
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase$DynamicTableFunction.class */
    public static class DynamicTableFunction extends TableFunction<Object> {
        @FunctionHint(output = @DataTypeHint("STRING"))
        public void eval(String str) {
            if (str == null) {
                Assert.fail();
            } else {
                collect(str + " is a string");
            }
        }

        @FunctionHint(output = @DataTypeHint("INT"))
        public void eval(Integer num) {
            if (num == null) {
                collect(null);
            } else {
                collect(num);
            }
        }
    }

    @FunctionHint(accumulator = @DataTypeHint("ROW<longestString STRING>"))
    /* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase$LongestStringAggregateFunction.class */
    public static class LongestStringAggregateFunction extends AggregateFunction<String, Row> {
        /* renamed from: createAccumulator, reason: merged with bridge method [inline-methods] */
        public Row m3008createAccumulator() {
            return Row.of(new Object[]{(String) null});
        }

        public void accumulate(Row row, String str) {
            if (str == null) {
                return;
            }
            String str2 = (String) row.getField(0);
            if (str2 == null || str2.length() < str.length()) {
                row.setField(0, str);
            }
        }

        public String getValue(Row row) {
            return (String) row.getField(0);
        }
    }

    @DataTypeHint("ROW<s STRING, b BYTES>")
    /* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase$LookupTableFunction.class */
    public static class LookupTableFunction extends TableFunction<Object> {
        public void eval(@DataTypeHint("STRING") StringData stringData) {
            collect(Row.of(new Object[]{stringData.toString(), new byte[0]}));
            collect(Row.of(new Object[]{stringData.toString(), stringData.toBytes()}));
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase$MyYear.class */
    public static class MyYear extends ScalarFunction {
        public int eval(@DataTypeHint("TIMESTAMP(3) NOT NULL") LocalDateTime localDateTime) {
            return localDateTime.getYear();
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase$PrimitiveScalarFunction.class */
    public static class PrimitiveScalarFunction extends ScalarFunction {
        public long eval(int i, long j, String str) {
            return i + j + str.length();
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase$RawLiteralScalarFunction.class */
    public static class RawLiteralScalarFunction extends ScalarFunction {
        public Object eval(DayOfWeek dayOfWeek, Boolean bool) {
            if (dayOfWeek == null) {
                return null;
            }
            return bool.booleanValue() ? dayOfWeek.toString() : dayOfWeek;
        }

        public TypeInference getTypeInference(DataTypeFactory dataTypeFactory) {
            DataType dataType = DataTypes.RAW(DayOfWeek.class).toDataType(dataTypeFactory);
            return TypeInference.newBuilder().typedArguments(new DataType[]{dataType, (DataType) DataTypes.BOOLEAN().notNull()}).outputTypeStrategy(callContext -> {
                return ((Boolean) callContext.getArgumentValue(1, Boolean.class).orElse(false)).booleanValue() ? Optional.of(DataTypes.STRING()) : Optional.of(dataType);
            }).build();
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase$RawMapViewAggregateFunction.class */
    public static class RawMapViewAggregateFunction extends AggregateFunction<String, AccWithRawView> {

        /* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase$RawMapViewAggregateFunction$AccWithRawView.class */
        public static class AccWithRawView {

            @DataTypeHint(allowRawGlobally = HintFlag.TRUE)
            public MapView<String, RawPojo> view = new MapView<>();
        }

        /* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase$RawMapViewAggregateFunction$RawPojo.class */
        public static class RawPojo {
            public String a;
            public int b;

            public RawPojo(String str) {
                this.a = str;
                this.b = str.length();
            }

            public String toString() {
                return "(" + this.a + ", " + this.b + ')';
            }
        }

        /* renamed from: createAccumulator, reason: merged with bridge method [inline-methods] */
        public AccWithRawView m3009createAccumulator() {
            return new AccWithRawView();
        }

        public void accumulate(AccWithRawView accWithRawView, String str) throws Exception {
            if (str != null) {
                accWithRawView.view.put(str, new RawPojo(str));
            }
        }

        public String getValue(AccWithRawView accWithRawView) {
            return (String) accWithRawView.view.getMap().entrySet().stream().map((v0) -> {
                return Objects.toString(v0);
            }).sorted().collect(Collectors.joining(", "));
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase$RowScalarFunction.class */
    public static class RowScalarFunction extends ScalarFunction {
        @DataTypeHint("ROW<f0 INT, f1 STRING>")
        public Row eval(@DataTypeHint("ROW<f0 INT, f1 STRING>") Row row) {
            return row;
        }
    }

    @FunctionHint(output = @DataTypeHint("ROW<s STRING, sa ARRAY<STRING> NOT NULL>"))
    /* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase$RowTableFunction.class */
    public static class RowTableFunction extends TableFunction<Row> {
        public void eval(String str) {
            if (str == null) {
                collect(null);
            } else {
                collect(Row.of(new Object[]{str, str.split(",")}));
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase$StructuredScalarFunction.class */
    public static class StructuredScalarFunction extends ScalarFunction {
        public StructuredUser eval(String str, int i) {
            if (str == null) {
                return null;
            }
            return new StructuredUser(str, i);
        }

        public String eval(StructuredUser structuredUser) {
            return structuredUser == null ? "<<null>>" : structuredUser.toString();
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase$StructuredTableFunction.class */
    public static class StructuredTableFunction extends TableFunction<StructuredUser> {
        public void eval(String str, int i) {
            if (str == null) {
                collect(null);
            }
            collect(new StructuredUser(str, i));
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase$StructuredUser.class */
    public static class StructuredUser {
        public final String name;
        public final int age;

        public StructuredUser(String str, int i) {
            this.name = str;
            this.age = i;
        }

        public String toString() {
            return this.name + " " + this.age;
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase$TestUDF.class */
    public static class TestUDF extends ScalarFunction {
        public Integer eval(Integer num, Integer num2) {
            return Integer.valueOf(num.intValue() + num2.intValue());
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase$TypeOfScalarFunction.class */
    public static class TypeOfScalarFunction extends ScalarFunction implements SpecializedFunction {
        private final String typeString;

        public TypeOfScalarFunction() {
            this("UNKNOWN");
        }

        public TypeOfScalarFunction(String str) {
            this.typeString = str;
        }

        public String eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object obj) {
            return this.typeString;
        }

        /* renamed from: specialize, reason: merged with bridge method [inline-methods] */
        public TypeOfScalarFunction m3010specialize(SpecializedFunction.SpecializedContext specializedContext) {
            return new TypeOfScalarFunction(((DataType) specializedContext.getCallContext().getArgumentDataTypes().get(0)).toString());
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase$VarArgScalarFunction.class */
    public static class VarArgScalarFunction extends ScalarFunction {
        public String eval(Integer... numArr) {
            return "(INT...)";
        }

        public String eval(String str, Integer... numArr) {
            return "(STRING, INT...)";
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase$WildcardClassNameScalarFunction.class */
    public static class WildcardClassNameScalarFunction extends ClassNameScalarFunction {
        public String eval(Object obj) {
            return "Object";
        }

        public TypeInference getTypeInference(DataTypeFactory dataTypeFactory) {
            return TypeInference.newBuilder().outputTypeStrategy(TypeStrategies.explicit(DataTypes.STRING())).build();
        }
    }

    @Test
    public void testCreateCatalogFunctionInDefaultCatalog() {
        tEnv().executeSql("create function f1 as 'org.apache.flink.function.TestFunction'");
        Assert.assertTrue(Arrays.asList(tEnv().listFunctions()).contains("f1"));
        tEnv().executeSql("DROP FUNCTION IF EXISTS default_catalog.default_database.f1");
        Assert.assertFalse(Arrays.asList(tEnv().listFunctions()).contains("f1"));
    }

    @Test
    public void testCreateFunctionWithFullPath() {
        tEnv().executeSql("create function default_catalog.default_database.f2 as 'org.apache.flink.function.TestFunction'");
        Assert.assertTrue(Arrays.asList(tEnv().listFunctions()).contains("f2"));
        tEnv().executeSql("DROP FUNCTION IF EXISTS default_catalog.default_database.f2");
        Assert.assertFalse(Arrays.asList(tEnv().listFunctions()).contains("f2"));
    }

    @Test
    public void testCreateFunctionWithoutCatalogIdentifier() {
        tEnv().executeSql("create function default_database.f3 as 'org.apache.flink.function.TestFunction'");
        Assert.assertTrue(Arrays.asList(tEnv().listFunctions()).contains("f3"));
        tEnv().executeSql("DROP FUNCTION IF EXISTS default_catalog.default_database.f3");
        Assert.assertFalse(Arrays.asList(tEnv().listFunctions()).contains("f3"));
    }

    @Test
    public void testCreateFunctionCatalogNotExists() {
        try {
            tEnv().executeSql("create function catalog1.database1.f3 as 'org.apache.flink.function.TestFunction'");
        } catch (Exception e) {
            Assert.assertEquals("Catalog catalog1 does not exist", e.getMessage());
        }
    }

    @Test
    public void testCreateFunctionDBNotExists() {
        try {
            tEnv().executeSql("create function default_catalog.database1.f3 as 'org.apache.flink.function.TestFunction'");
        } catch (Exception e) {
            Assert.assertEquals(e.getMessage(), "Could not execute CREATE CATALOG FUNCTION: (catalogFunction: [Optional[This is a user-defined function]], identifier: [`default_catalog`.`database1`.`f3`], ignoreIfExists: [false], isTemporary: [false])");
        }
    }

    @Test
    public void testCreateTemporaryCatalogFunction() {
        String str = "create temporary function default_catalog.default_database.f4 as '" + TEST_FUNCTION + "'";
        String str2 = "create temporary function if not exists default_catalog.default_database.f4 as '" + TEST_FUNCTION + "'";
        tEnv().executeSql(str);
        Assert.assertTrue(Arrays.asList(tEnv().listFunctions()).contains("f4"));
        tEnv().executeSql(str2);
        Assert.assertTrue(Arrays.asList(tEnv().listFunctions()).contains("f4"));
        tEnv().executeSql("drop temporary function default_catalog.default_database.f4");
        Assert.assertFalse(Arrays.asList(tEnv().listFunctions()).contains("f4"));
        tEnv().executeSql(str);
        try {
            tEnv().executeSql(str);
        } catch (Exception e) {
            Assert.assertTrue(e instanceof ValidationException);
            Assert.assertEquals("Could not register temporary catalog function. A function 'default_catalog.default_database.f4' does already exist.", e.getMessage());
        }
        tEnv().executeSql("drop temporary function default_catalog.default_database.f4");
        tEnv().executeSql("drop temporary function if exists default_catalog.default_database.f4");
        try {
            tEnv().executeSql("drop temporary function default_catalog.default_database.f4");
        } catch (Exception e2) {
            Assert.assertTrue(e2 instanceof ValidationException);
            Assert.assertEquals("Temporary catalog function `default_catalog`.`default_database`.`f4` doesn't exist", e2.getMessage());
        }
    }

    @Test
    public void testCreateTemporarySystemFunction() {
        String str = "create temporary system function f5 as '" + TEST_FUNCTION + "'";
        String str2 = "create temporary system function if not exists f5 as '" + TEST_FUNCTION + "'";
        tEnv().executeSql(str);
        tEnv().executeSql(str2);
        tEnv().executeSql("drop temporary system function f5");
    }

    @Test
    public void testAlterFunction() throws Exception {
        ObjectPath objectPath = new ObjectPath("default_database", "f3");
        Assert.assertTrue(tEnv().getCatalog("default_catalog").isPresent());
        Catalog catalog = (Catalog) tEnv().getCatalog("default_catalog").get();
        tEnv().executeSql("create function f3 as 'org.apache.flink.function.TestFunction'");
        Assert.assertEquals("org.apache.flink.function.TestFunction", catalog.getFunction(objectPath).getClassName());
        tEnv().executeSql("alter function f3 as 'org.apache.flink.function.TestFunction2'");
        Assert.assertEquals("org.apache.flink.function.TestFunction2", catalog.getFunction(objectPath).getClassName());
    }

    @Test
    public void testAlterFunctionNonExists() {
        try {
            tEnv().executeSql("ALTER FUNCTION default_catalog.default_database.f4 as 'org.apache.flink.function.TestFunction'");
            Assert.fail();
        } catch (Exception e) {
            Assert.assertEquals(e.getMessage(), "Function default_database.f4 does not exist in Catalog default_catalog.");
        }
        try {
            tEnv().executeSql("ALTER FUNCTION catalog1.default_database.f4 as 'org.apache.flink.function.TestFunction'");
            Assert.fail();
        } catch (Exception e2) {
            Assert.assertEquals("Catalog catalog1 does not exist", e2.getMessage());
        }
        try {
            tEnv().executeSql("ALTER FUNCTION default_catalog.db1.f4 as 'org.apache.flink.function.TestFunction'");
            Assert.fail();
        } catch (Exception e3) {
            Assert.assertEquals(e3.getMessage(), "Function db1.f4 does not exist in Catalog default_catalog.");
        }
    }

    @Test
    public void testAlterTemporaryCatalogFunction() {
        try {
            tEnv().executeSql("ALTER TEMPORARY FUNCTION default_catalog.default_database.f4 as 'org.apache.flink.function.TestFunction'");
            Assert.fail();
        } catch (Exception e) {
            Assert.assertEquals("Alter temporary catalog function is not supported", e.getMessage());
        }
    }

    @Test
    public void testAlterTemporarySystemFunction() {
        try {
            tEnv().executeSql("ALTER TEMPORARY SYSTEM FUNCTION default_catalog.default_database.f4 as 'org.apache.flink.function.TestFunction'");
            Assert.fail();
        } catch (Exception e) {
            Assert.assertEquals("Alter temporary system function is not supported", e.getMessage());
        }
    }

    @Test
    public void testDropFunctionNonExists() {
        try {
            tEnv().executeSql("DROP FUNCTION default_catalog.default_database.f4");
            Assert.fail();
        } catch (Exception e) {
            Assert.assertEquals(e.getMessage(), "Function default_database.f4 does not exist in Catalog default_catalog.");
        }
        try {
            tEnv().executeSql("DROP FUNCTION catalog1.default_database.f4");
            Assert.fail();
        } catch (Exception e2) {
            Assert.assertEquals("Catalog catalog1 does not exist", e2.getMessage());
        }
        try {
            tEnv().executeSql("DROP FUNCTION default_catalog.db1.f4");
            Assert.fail();
        } catch (Exception e3) {
            Assert.assertEquals(e3.getMessage(), "Function db1.f4 does not exist in Catalog default_catalog.");
        }
    }

    @Test
    public void testDropTemporaryFunctionNonExits() {
        try {
            tEnv().executeSql("DROP TEMPORARY FUNCTION default_catalog.default_database.f4");
            Assert.fail();
        } catch (Exception e) {
            Assert.assertEquals(e.getMessage(), "Temporary catalog function `default_catalog`.`default_database`.`f4` doesn't exist");
        }
        try {
            tEnv().executeSql("DROP TEMPORARY FUNCTION catalog1.default_database.f4");
            Assert.fail();
        } catch (Exception e2) {
            Assert.assertEquals(e2.getMessage(), "Temporary catalog function `catalog1`.`default_database`.`f4` doesn't exist");
        }
        try {
            tEnv().executeSql("DROP TEMPORARY FUNCTION default_catalog.db1.f4");
            Assert.fail();
        } catch (Exception e3) {
            Assert.assertEquals(e3.getMessage(), "Temporary catalog function `default_catalog`.`db1`.`f4` doesn't exist");
        }
    }

    @Test
    public void testCreateDropTemporaryCatalogFunctionsWithDifferentIdentifier() {
        tEnv().executeSql("create temporary function f4 as '" + TEST_FUNCTION + "'");
        tEnv().executeSql("drop temporary function f4");
        tEnv().executeSql("create temporary function catalog1.default_database.f4 as '" + TEST_FUNCTION + "'");
        tEnv().executeSql("drop temporary function catalog1.default_database.f4");
        tEnv().executeSql("create temporary function default_catalog.db1.f4 as '" + TEST_FUNCTION + "'");
        tEnv().executeSql("drop temporary function default_catalog.db1.f4");
    }

    @Test
    public void testDropTemporarySystemFunction() {
        tEnv().executeSql("create temporary system function f5 as '" + TEST_FUNCTION + "'");
        tEnv().executeSql("drop temporary system function f5");
        tEnv().executeSql("drop temporary system function if exists f5");
        try {
            tEnv().executeSql("drop temporary system function f5");
        } catch (Exception e) {
            Assert.assertEquals(e.getMessage(), "Could not drop temporary system function. A function named 'f5' doesn't exist.");
        }
    }

    @Test
    public void testUserDefinedRegularCatalogFunction() throws Exception {
        testUserDefinedCatalogFunction("create function addOne as '" + TEST_FUNCTION + "'");
        tEnv().executeSql("drop function addOne");
    }

    @Test
    public void testUserDefinedTemporaryCatalogFunction() throws Exception {
        testUserDefinedCatalogFunction("create temporary function addOne as '" + TEST_FUNCTION + "'");
        tEnv().executeSql("drop temporary function addOne");
    }

    @Test
    public void testUserDefinedTemporarySystemFunction() throws Exception {
        testUserDefinedCatalogFunction("create temporary system function addOne as '" + TEST_FUNCTION + "'");
        tEnv().executeSql("drop temporary system function addOne");
    }

    private void testUserDefinedCatalogFunction(String str) throws Exception {
        List asList = Arrays.asList(Row.of(new Object[]{1, "1000", 2}), Row.of(new Object[]{2, "1", 3}), Row.of(new Object[]{3, "2000", 4}), Row.of(new Object[]{1, "2", 2}), Row.of(new Object[]{2, "3000", 3}));
        TestCollectionTableFactory.reset();
        TestCollectionTableFactory.initData(asList);
        tEnv().executeSql("create table t1(a int, b varchar, c int) with ('connector' = 'COLLECTION')");
        tEnv().executeSql("create table t2(a int, b varchar, c int) with ('connector' = 'COLLECTION')");
        tEnv().executeSql(str);
        tEnv().sqlQuery("select t1.a, t1.b, addOne(t1.a, 1) as c from t1").executeInsert("t2").await();
        Assert.assertArrayEquals((Row[]) asList.toArray(new Row[0]), (Row[]) TestCollectionTableFactory.RESULT().toArray(new Row[0]));
        tEnv().executeSql("drop table t1");
        tEnv().executeSql("drop table t2");
    }

    @Test
    public void testPrimitiveScalarFunction() throws Exception {
        List asList = Arrays.asList(Row.of(new Object[]{1, 1L, "-"}), Row.of(new Object[]{2, 2L, "--"}), Row.of(new Object[]{3, 3L, "---"}));
        List asList2 = Arrays.asList(Row.of(new Object[]{1, 3L, "-"}), Row.of(new Object[]{2, 6L, "--"}), Row.of(new Object[]{3, 9L, "---"}));
        TestCollectionTableFactory.reset();
        TestCollectionTableFactory.initData(asList);
        tEnv().executeSql("CREATE TABLE TestTable(i INT NOT NULL, b BIGINT NOT NULL, s STRING) WITH ('connector' = 'COLLECTION')");
        tEnv().createTemporarySystemFunction("PrimitiveScalarFunction", PrimitiveScalarFunction.class);
        tEnv().executeSql("INSERT INTO TestTable SELECT i, PrimitiveScalarFunction(i, b, s), s FROM TestTable").await();
        Assert.assertThat(TestCollectionTableFactory.getResult(), CoreMatchers.equalTo(asList2));
    }

    @Test
    public void testNullScalarFunction() throws Exception {
        List singletonList = Collections.singletonList(Row.of(new Object[]{"Boolean", "String", "<<unknown>>", "String", "Object", "Boolean"}));
        TestCollectionTableFactory.reset();
        tEnv().executeSql("CREATE TABLE TestTable(s1 STRING, s2 STRING, s3 STRING, s4 STRING, s5 STRING, s6 STRING) WITH ('connector' = 'COLLECTION')");
        tEnv().createTemporarySystemFunction("ClassNameScalarFunction", ClassNameScalarFunction.class);
        tEnv().createTemporarySystemFunction("ClassNameOrUnknownScalarFunction", ClassNameOrUnknownScalarFunction.class);
        tEnv().createTemporarySystemFunction("WildcardClassNameScalarFunction", WildcardClassNameScalarFunction.class);
        tEnv().executeSql("INSERT INTO TestTable SELECT ClassNameScalarFunction(NULL), ClassNameScalarFunction(CAST(NULL AS STRING)), ClassNameOrUnknownScalarFunction(NULL), ClassNameOrUnknownScalarFunction(CAST(NULL AS STRING)), WildcardClassNameScalarFunction(NULL), WildcardClassNameScalarFunction(CAST(NULL AS BOOLEAN))").await();
        Assert.assertThat(TestCollectionTableFactory.getResult(), CoreMatchers.equalTo(singletonList));
    }

    @Test
    public void testRowScalarFunction() throws Exception {
        List asList = Arrays.asList(Row.of(new Object[]{1, Row.of(new Object[]{1, "1"})}), Row.of(new Object[]{2, Row.of(new Object[]{2, "2"})}), Row.of(new Object[]{3, Row.of(new Object[]{3, "3"})}));
        TestCollectionTableFactory.reset();
        TestCollectionTableFactory.initData(asList);
        tEnv().executeSql("CREATE TABLE TestTable(i INT, r ROW<i INT, s STRING>) WITH ('connector' = 'COLLECTION')");
        tEnv().createTemporarySystemFunction("RowScalarFunction", RowScalarFunction.class);
        tEnv().executeSql("INSERT INTO TestTable SELECT i, RowScalarFunction(r) FROM TestTable").await();
        Assert.assertThat(TestCollectionTableFactory.getResult(), CoreMatchers.equalTo(asList));
    }

    @Test
    public void testComplexScalarFunction() throws Exception {
        List asList = Arrays.asList(Row.of(new Object[]{1, new byte[]{1, 2, 3}}), Row.of(new Object[]{2, new byte[]{2, 3, 4}}), Row.of(new Object[]{3, new byte[]{3, 4, 5}}), Row.of(new Object[]{null, null}));
        List asList2 = Arrays.asList(Row.of(new Object[]{1, "1+2012-12-12 12:12:12.123456789", "[1, 2, 3]+2012-12-12 12:12:12.123456789", new BigDecimal("123.40"), ByteBuffer.wrap(new byte[]{1, 2, 3})}), Row.of(new Object[]{2, "2+2012-12-12 12:12:12.123456789", "[2, 3, 4]+2012-12-12 12:12:12.123456789", new BigDecimal("123.40"), ByteBuffer.wrap(new byte[]{2, 3, 4})}), Row.of(new Object[]{3, "3+2012-12-12 12:12:12.123456789", "[3, 4, 5]+2012-12-12 12:12:12.123456789", new BigDecimal("123.40"), ByteBuffer.wrap(new byte[]{3, 4, 5})}), Row.of(new Object[]{null, "null+2012-12-12 12:12:12.123456789", "null+2012-12-12 12:12:12.123456789", new BigDecimal("123.40"), null}));
        TestCollectionTableFactory.reset();
        TestCollectionTableFactory.initData(asList);
        RawType rawType = new RawType(Object.class, new KryoSerializer(Object.class, new ExecutionConfig()));
        tEnv().executeSql("CREATE TABLE SourceTable(i INT, b BYTES) WITH ('connector' = 'COLLECTION')");
        tEnv().executeSql("CREATE TABLE SinkTable(  i INT,   s1 STRING,   s2 STRING,   d DECIMAL(5, 2),  r " + rawType.asSerializableString() + ") WITH ('connector' = 'COLLECTION')");
        tEnv().createTemporarySystemFunction("ComplexScalarFunction", ComplexScalarFunction.class);
        tEnv().executeSql("INSERT INTO SinkTable SELECT   i,   ComplexScalarFunction(i, TIMESTAMP '2012-12-12 12:12:12.123456789'),   ComplexScalarFunction(b, TIMESTAMP '2012-12-12 12:12:12.123456789'),  ComplexScalarFunction(),   ComplexScalarFunction(b) FROM SourceTable").await();
        Assert.assertThat(TestCollectionTableFactory.getResult(), CoreMatchers.equalTo(asList2));
    }

    @Test
    public void testCustomScalarFunction() throws Exception {
        List asList = Arrays.asList(Row.of(new Object[]{1}), Row.of(new Object[]{2}), Row.of(new Object[]{3}), Row.of(new Object[]{(Integer) null}));
        List asList2 = Arrays.asList(Row.of(new Object[]{1, 1, 5}), Row.of(new Object[]{2, 2, 5}), Row.of(new Object[]{3, 3, 5}), Row.of(new Object[]{null, null, 5}));
        TestCollectionTableFactory.reset();
        TestCollectionTableFactory.initData(asList);
        tEnv().executeSql("CREATE TABLE SourceTable(i INT) WITH ('connector' = 'COLLECTION')");
        tEnv().executeSql("CREATE TABLE SinkTable(i1 INT, i2 INT, i3 INT) WITH ('connector' = 'COLLECTION')");
        tEnv().createTemporarySystemFunction("CustomScalarFunction", CustomScalarFunction.class);
        tEnv().executeSql("INSERT INTO SinkTable SELECT   i,   CustomScalarFunction(i),   CustomScalarFunction(CAST(NULL AS INT), 5, i, i) FROM SourceTable").await();
        Assert.assertThat(TestCollectionTableFactory.getResult(), CoreMatchers.equalTo(asList2));
    }

    @Test
    public void testVarArgScalarFunction() throws Exception {
        List asList = Arrays.asList(Row.of(new Object[]{"Bob", 1}), Row.of(new Object[]{"Alice", 2}));
        TestCollectionTableFactory.reset();
        TestCollectionTableFactory.initData(asList);
        tEnv().executeSql("CREATE TABLE SourceTable(  s STRING,   i INT)WITH (  'connector' = 'COLLECTION')");
        tEnv().createTemporarySystemFunction("VarArgScalarFunction", VarArgScalarFunction.class);
        Assert.assertThat(CollectionUtil.iteratorToList(tEnv().executeSql("SELECT   VarArgScalarFunction(),   VarArgScalarFunction(i),   VarArgScalarFunction(i, i),   VarArgScalarFunction(s),   VarArgScalarFunction(s, i) FROM SourceTable").collect()), CoreMatchers.equalTo(Arrays.asList(Row.of(new Object[]{"(INT...)", "(INT...)", "(INT...)", "(STRING, INT...)", "(STRING, INT...)"}), Row.of(new Object[]{"(INT...)", "(INT...)", "(INT...)", "(STRING, INT...)", "(STRING, INT...)"}))));
    }

    @Test
    public void testRawLiteralScalarFunction() throws Exception {
        List asList = Arrays.asList(Row.of(new Object[]{1, DayOfWeek.MONDAY}), Row.of(new Object[]{2, DayOfWeek.FRIDAY}), Row.of(new Object[]{null, null}));
        Row[] rowArr = {Row.of(new Object[]{1, "MONDAY", DayOfWeek.MONDAY}), Row.of(new Object[]{1, "MONDAY", DayOfWeek.MONDAY}), Row.of(new Object[]{2, "FRIDAY", DayOfWeek.FRIDAY}), Row.of(new Object[]{2, "FRIDAY", DayOfWeek.FRIDAY}), Row.of(new Object[]{null, null, null}), Row.of(new Object[]{null, null, null})};
        TestCollectionTableFactory.reset();
        TestCollectionTableFactory.initData(asList);
        RawType rawType = new RawType(DayOfWeek.class, new KryoSerializer(DayOfWeek.class, new ExecutionConfig()));
        tEnv().executeSql("CREATE TABLE SourceTable(  i INT,   r " + rawType.asSerializableString() + ") WITH ('connector' = 'COLLECTION')");
        tEnv().executeSql("CREATE TABLE SinkTable(  i INT,   s STRING,   r " + rawType.asSerializableString() + ") WITH ('connector' = 'COLLECTION')");
        tEnv().createTemporarySystemFunction("RawLiteralScalarFunction", RawLiteralScalarFunction.class);
        tEnv().executeSql("INSERT INTO SinkTable   (SELECT     i,     RawLiteralScalarFunction(r, TRUE),     RawLiteralScalarFunction(r, FALSE)    FROM SourceTable)UNION ALL   (SELECT     i,     RawLiteralScalarFunction(r, TRUE),     RawLiteralScalarFunction(r, FALSE)   FROM SourceTable)").await();
        Assert.assertThat(TestCollectionTableFactory.getResult(), Matchers.containsInAnyOrder(rowArr));
    }

    @Test
    public void testStructuredScalarFunction() throws Exception {
        List asList = Arrays.asList(Row.of(new Object[]{"Bob", 42}), Row.of(new Object[]{"Alice", 12}), Row.of(new Object[]{null, 0}));
        List asList2 = Arrays.asList(Row.of(new Object[]{"Bob 42", "Tyler"}), Row.of(new Object[]{"Alice 12", "Tyler"}), Row.of(new Object[]{"<<null>>", "Tyler"}));
        TestCollectionTableFactory.reset();
        TestCollectionTableFactory.initData(asList);
        tEnv().executeSql("CREATE TABLE SourceTable(s STRING, i INT NOT NULL) WITH ('connector' = 'COLLECTION')");
        tEnv().executeSql("CREATE TABLE SinkTable(s1 STRING, s2 STRING) WITH ('connector' = 'COLLECTION')");
        tEnv().createTemporarySystemFunction("StructuredScalarFunction", StructuredScalarFunction.class);
        tEnv().executeSql("INSERT INTO SinkTable SELECT   StructuredScalarFunction(StructuredScalarFunction(s, i)),   StructuredScalarFunction('Tyler', 27).name FROM SourceTable").await();
        Assert.assertThat(TestCollectionTableFactory.getResult(), CoreMatchers.equalTo(asList2));
    }

    @Test
    public void testInvalidCustomScalarFunction() throws Exception {
        tEnv().executeSql("CREATE TABLE SinkTable(s STRING) WITH ('connector' = 'COLLECTION')");
        tEnv().createTemporarySystemFunction("CustomScalarFunction", CustomScalarFunction.class);
        try {
            tEnv().executeSql("INSERT INTO SinkTable SELECT CustomScalarFunction('test')").await();
            Assert.fail();
        } catch (ValidationException e) {
            Assert.assertThat(e, ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("Could not find an implementation method 'eval' in class '" + CustomScalarFunction.class.getName() + "' for function 'CustomScalarFunction' that matches the following signature:\njava.lang.String eval(java.lang.String)")));
        }
    }

    @Test
    public void testRowTableFunction() throws Exception {
        List asList = Arrays.asList(Row.of(new Object[]{"1,2,3"}), Row.of(new Object[]{"2,3,4"}), Row.of(new Object[]{"3,4,5"}), Row.of(new Object[]{(String) null}));
        List asList2 = Arrays.asList(Row.of(new Object[]{"1,2,3", new String[]{"1", "2", "3"}}), Row.of(new Object[]{"2,3,4", new String[]{"2", "3", "4"}}), Row.of(new Object[]{"3,4,5", new String[]{"3", "4", "5"}}));
        TestCollectionTableFactory.reset();
        TestCollectionTableFactory.initData(asList);
        tEnv().executeSql("CREATE TABLE SourceTable(s STRING) WITH ('connector' = 'COLLECTION')");
        tEnv().executeSql("CREATE TABLE SinkTable(s STRING, sa ARRAY<STRING> NOT NULL) WITH ('connector' = 'COLLECTION')");
        tEnv().createTemporarySystemFunction("RowTableFunction", RowTableFunction.class);
        tEnv().executeSql("INSERT INTO SinkTable SELECT t.s, t.sa FROM SourceTable source, LATERAL TABLE(RowTableFunction(source.s)) t").await();
        Assert.assertThat(TestCollectionTableFactory.getResult(), CoreMatchers.equalTo(asList2));
    }

    @Test
    public void testStructuredTableFunction() throws Exception {
        List asList = Arrays.asList(Row.of(new Object[]{"Bob", 42}), Row.of(new Object[]{"Alice", 12}), Row.of(new Object[]{null, 0}));
        List asList2 = Arrays.asList(Row.of(new Object[]{"Bob", 42}), Row.of(new Object[]{"Alice", 12}), Row.of(new Object[]{null, 0}));
        TestCollectionTableFactory.reset();
        TestCollectionTableFactory.initData(asList);
        tEnv().executeSql("CREATE TABLE SourceTable(s STRING, i INT NOT NULL) WITH ('connector' = 'COLLECTION')");
        tEnv().executeSql("CREATE TABLE SinkTable(s STRING, i INT NOT NULL) WITH ('connector' = 'COLLECTION')");
        tEnv().createTemporarySystemFunction("StructuredTableFunction", StructuredTableFunction.class);
        tEnv().executeSql("INSERT INTO SinkTable SELECT t.name, t.age FROM SourceTable, LATERAL TABLE(StructuredTableFunction(s, i)) t").await();
        Assert.assertThat(TestCollectionTableFactory.getResult(), CoreMatchers.equalTo(asList2));
    }

    @Test
    public void testDynamicCatalogTableFunction() throws Exception {
        Row[] rowArr = {Row.of(new Object[]{"Test is a string"}), Row.of(new Object[]{"42"}), Row.of(new Object[]{(String) null})};
        TestCollectionTableFactory.reset();
        tEnv().executeSql("CREATE TABLE SinkTable(s STRING) WITH ('connector' = 'COLLECTION')");
        tEnv().createFunction("DynamicTableFunction", DynamicTableFunction.class);
        tEnv().executeSql("INSERT INTO SinkTable SELECT T1.s FROM TABLE(DynamicTableFunction('Test')) AS T1(s) UNION ALL SELECT CAST(T2.i AS STRING) FROM TABLE(DynamicTableFunction(42)) AS T2(i)UNION ALL SELECT CAST(T3.i AS STRING) FROM TABLE(DynamicTableFunction(CAST(NULL AS INT))) AS T3(i)").await();
        Assert.assertThat(TestCollectionTableFactory.getResult(), Matchers.containsInAnyOrder(rowArr));
    }

    @Test
    public void testInvalidUseOfScalarFunction() {
        tEnv().executeSql("CREATE TABLE SinkTable(s BIGINT NOT NULL) WITH ('connector' = 'COLLECTION')");
        tEnv().createTemporarySystemFunction("PrimitiveScalarFunction", PrimitiveScalarFunction.class);
        try {
            tEnv().executeSql("INSERT INTO SinkTable SELECT * FROM TABLE(PrimitiveScalarFunction(1, 2, '3'))");
            Assert.fail();
        } catch (ValidationException e) {
            Assert.assertThat(e, ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("SQL validation failed. Function 'PrimitiveScalarFunction' cannot be used as a table function.")));
        }
    }

    @Test
    public void testInvalidUseOfSystemScalarFunction() {
        tEnv().executeSql("CREATE TABLE SinkTable(s STRING) WITH ('connector' = 'COLLECTION')");
        try {
            tEnv().explainSql("INSERT INTO SinkTable SELECT * FROM TABLE(MD5('3'))", new ExplainDetail[0]);
            Assert.fail();
        } catch (ValidationException e) {
            Assert.assertThat(e, ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("Currently, only table functions can be used in a correlate operation.")));
        }
    }

    @Test
    public void testInvalidUseOfTableFunction() {
        TestCollectionTableFactory.reset();
        tEnv().executeSql("CREATE TABLE SinkTable(s ROW<s STRING, sa ARRAY<STRING> NOT NULL>) WITH ('connector' = 'COLLECTION')");
        tEnv().createTemporarySystemFunction("RowTableFunction", RowTableFunction.class);
        try {
            tEnv().explainSql("INSERT INTO SinkTable SELECT RowTableFunction('test')", new ExplainDetail[0]);
            Assert.fail();
        } catch (ValidationException e) {
            Assert.assertThat(e, ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("Currently, only scalar functions can be used in a projection or filter operation.")));
        }
    }

    @Test
    public void testAggregateFunction() throws Exception {
        List asList = Arrays.asList(Row.of(new Object[]{LocalDateTime.parse("2007-12-03T10:15:30"), "Bob"}), Row.of(new Object[]{LocalDateTime.parse("2007-12-03T10:15:30"), "Alice"}), Row.of(new Object[]{LocalDateTime.parse("2007-12-03T10:15:30"), null}), Row.of(new Object[]{LocalDateTime.parse("2007-12-03T10:15:30"), "Jonathan"}), Row.of(new Object[]{LocalDateTime.parse("2007-12-03T10:15:32"), "Bob"}), Row.of(new Object[]{LocalDateTime.parse("2007-12-03T10:15:32"), "Alice"}));
        List asList2 = Arrays.asList(Row.of(new Object[]{"Jonathan", "Alice=(Alice, 5), Bob=(Bob, 3), Jonathan=(Jonathan, 8)"}), Row.of(new Object[]{"Alice", "Alice=(Alice, 5), Bob=(Bob, 3)"}));
        TestCollectionTableFactory.reset();
        TestCollectionTableFactory.initData(asList);
        tEnv().executeSql("CREATE TABLE SourceTable(ts TIMESTAMP(3), s STRING, WATERMARK FOR ts AS ts - INTERVAL '1' SECOND) WITH ('connector' = 'COLLECTION')");
        tEnv().executeSql("CREATE TABLE SinkTable(s1 STRING, s2 STRING) WITH ('connector' = 'COLLECTION')");
        tEnv().createTemporarySystemFunction("LongestStringAggregateFunction", LongestStringAggregateFunction.class);
        tEnv().createTemporarySystemFunction("RawMapViewAggregateFunction", RawMapViewAggregateFunction.class);
        tEnv().executeSql("INSERT INTO SinkTable SELECT LongestStringAggregateFunction(s), RawMapViewAggregateFunction(s) FROM SourceTable GROUP BY TUMBLE(ts, INTERVAL '1' SECOND)").await();
        Assert.assertThat(TestCollectionTableFactory.getResult(), CoreMatchers.equalTo(asList2));
    }

    @Test
    public void testLookupTableFunction() throws ExecutionException, InterruptedException {
        List asList = Arrays.asList(Row.of(new Object[]{"Bob"}), Row.of(new Object[]{"Alice"}));
        List asList2 = Arrays.asList(Row.of(new Object[]{"Bob", new byte[0]}), Row.of(new Object[]{"Bob", new byte[]{66, 111, 98}}), Row.of(new Object[]{"Alice", new byte[0]}), Row.of(new Object[]{"Alice", new byte[]{65, 108, 105, 99, 101}}));
        TestCollectionTableFactory.reset();
        TestCollectionTableFactory.initData(asList);
        tEnv().executeSql("CREATE TABLE SourceTable1(  s STRING,   proctime AS PROCTIME())WITH (  'connector' = 'COLLECTION')");
        tEnv().executeSql("CREATE TABLE SourceTable2(  s STRING,  b BYTES)WITH (  'connector' = 'values',  'lookup-function-class' = '" + LookupTableFunction.class.getName() + "')");
        tEnv().executeSql("CREATE TABLE SinkTable(s STRING, b BYTES) WITH ('connector' = 'COLLECTION')");
        tEnv().executeSql("INSERT INTO SinkTable SELECT SourceTable1.s, SourceTable2.b FROM SourceTable1 JOIN SourceTable2 FOR SYSTEM_TIME AS OF SourceTable1.proctime  ON SourceTable1.s = SourceTable2.s").await();
        Assert.assertThat(TestCollectionTableFactory.getResult(), CoreMatchers.equalTo(asList2));
    }

    @Test
    public void testSpecializedFunction() {
        List asList = Arrays.asList(Row.of(new Object[]{"Bob", 1, new BigDecimal("123.45")}), Row.of(new Object[]{"Alice", 2, new BigDecimal("123.456")}));
        TestCollectionTableFactory.reset();
        TestCollectionTableFactory.initData(asList);
        tEnv().executeSql("CREATE TABLE SourceTable(  s STRING,   i INT,  d DECIMAL(6, 3))WITH (  'connector' = 'COLLECTION')");
        tEnv().createTemporarySystemFunction("TypeOfScalarFunction", TypeOfScalarFunction.class);
        Assert.assertThat(CollectionUtil.iteratorToList(tEnv().executeSql("SELECT   TypeOfScalarFunction('LITERAL'),   TypeOfScalarFunction(s),   TypeOfScalarFunction(i),   TypeOfScalarFunction(d) FROM SourceTable").collect()), CoreMatchers.equalTo(Arrays.asList(Row.of(new Object[]{"CHAR(7) NOT NULL", "STRING", "INT", "DECIMAL(6, 3)"}), Row.of(new Object[]{"CHAR(7) NOT NULL", "STRING", "INT", "DECIMAL(6, 3)"}))));
    }

    @Test
    public void testTimestampNotNull() {
        List asList = Arrays.asList(Row.of(new Object[]{1}), Row.of(new Object[]{2}));
        TestCollectionTableFactory.reset();
        TestCollectionTableFactory.initData(asList);
        tEnv().executeSql("CREATE TABLE SourceTable(i INT, ts AS CAST(LOCALTIMESTAMP AS TIMESTAMP(3)), WATERMARK FOR ts AS ts) WITH ('connector' = 'COLLECTION')");
        tEnv().executeSql("CREATE FUNCTION MyYear AS '" + MyYear.class.getName() + "'");
        CollectionUtil.iteratorToList(tEnv().executeSql("SELECT MyYear(ts) FROM SourceTable").collect());
    }

    @Test
    public void testIsNullType() {
        List asList = Arrays.asList(Row.of(new Object[]{1}), Row.of(new Object[]{null}));
        TestCollectionTableFactory.reset();
        TestCollectionTableFactory.initData(asList);
        tEnv().executeSql("CREATE TABLE SourceTable(i INT) WITH ('connector' = 'COLLECTION')");
        tEnv().executeSql("CREATE FUNCTION BoolToInt AS '" + BoolToInt.class.getName() + "'");
        CollectionUtil.iteratorToList(tEnv().executeSql("SELECT BoolToInt(i is null), BoolToInt(i is not null) FROM SourceTable").collect());
    }
}
