package org.apache.flink.table.planner.plan.stream.sql;

import java.util.EnumSet;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.ArgumentTrait;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.functions.ProcessTableFunction;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.apache.flink.table.planner.utils.TableTestUtil;
import org.apache.flink.table.types.inference.StaticArgument;
import org.apache.flink.table.types.inference.StaticArgumentTrait;
import org.apache.flink.table.types.inference.TypeInference;
import org.apache.flink.types.Row;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

/* loaded from: input_file:org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.class */
public class ProcessTableFunctionTest extends TableTestBase {
    private TableTestUtil util;

    /* loaded from: input_file:org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest$EmptyArgFunction.class */
    public static class EmptyArgFunction extends ProcessTableFunction<String> {
        public void eval() {
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest$ErrorSpec.class */
    private static class ErrorSpec {
        private final String description;
        private final Class<? extends UserDefinedFunction> functionClass;
        private final String sql;
        private final String errorMessage;

        private ErrorSpec(String str, Class<? extends UserDefinedFunction> cls, String str2, String str3) {
            this.description = str;
            this.functionClass = cls;
            this.sql = str2;
            this.errorMessage = str3;
        }

        static ErrorSpec of(String str, Class<? extends UserDefinedFunction> cls, String str2, String str3) {
            return new ErrorSpec(str, cls, str2, str3);
        }

        public String toString() {
            return this.description;
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest$InvalidTypedUpdatingArgFunction.class */
    public static class InvalidTypedUpdatingArgFunction extends ProcessTableFunction<String> {
        public void eval(@ArgumentHint({ArgumentTrait.TABLE_AS_ROW, ArgumentTrait.SUPPORT_UPDATES}) User user, Integer num) {
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest$MultiTableFunction.class */
    public static class MultiTableFunction extends ProcessTableFunction<String> {
        public void eval(@ArgumentHint({ArgumentTrait.TABLE_AS_SET, ArgumentTrait.OPTIONAL_PARTITION_BY}) Row row, @ArgumentHint({ArgumentTrait.TABLE_AS_SET, ArgumentTrait.OPTIONAL_PARTITION_BY}) Row row2) {
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest$NoProcessTableFunction.class */
    public static class NoProcessTableFunction extends TableFunction<String> {
        public TypeInference getTypeInference(DataTypeFactory dataTypeFactory) {
            return TypeInference.newBuilder().staticArguments(new StaticArgument[]{StaticArgument.table("r", Row.class, false, EnumSet.of(StaticArgumentTrait.TABLE_AS_ROW))}).outputTypeStrategy(callContext -> {
                return Optional.of(DataTypes.STRING());
            }).build();
        }

        public void eval(Row row) {
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest$PojoArgsFunction.class */
    public static class PojoArgsFunction extends ProcessTableFunction<String> {
        public void eval(@ArgumentHint({ArgumentTrait.TABLE_AS_ROW}) User user, User user2) {
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest$PojoCreatingFunction.class */
    public static class PojoCreatingFunction extends ScalarFunction {
        public User eval(String str, Integer num) {
            return new User(str, num);
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest$ReservedArgFunction.class */
    public static class ReservedArgFunction extends ProcessTableFunction<String> {
        public void eval(String str) {
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest$ScalarArgsFunction.class */
    public static class ScalarArgsFunction extends ProcessTableFunction<String> {
        public void eval(Integer num, Boolean bool) {
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest$TableAsRowFunction.class */
    public static class TableAsRowFunction extends ProcessTableFunction<String> {
        public void eval(@ArgumentHint({ArgumentTrait.TABLE_AS_ROW}) Row row, Integer num) {
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest$TableAsRowPassThroughFunction.class */
    public static class TableAsRowPassThroughFunction extends ProcessTableFunction<String> {
        public void eval(@ArgumentHint({ArgumentTrait.TABLE_AS_ROW, ArgumentTrait.PASS_COLUMNS_THROUGH}) Row row, Integer num) {
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest$TableAsSetFunction.class */
    public static class TableAsSetFunction extends ProcessTableFunction<String> {
        public void eval(@ArgumentHint({ArgumentTrait.TABLE_AS_SET}) Row row, Integer num) {
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest$TableAsSetOptionalPartitionFunction.class */
    public static class TableAsSetOptionalPartitionFunction extends ProcessTableFunction<String> {
        public void eval(@ArgumentHint({ArgumentTrait.TABLE_AS_SET, ArgumentTrait.OPTIONAL_PARTITION_BY}) Row row, Integer num) {
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest$TableAsSetPassThroughFunction.class */
    public static class TableAsSetPassThroughFunction extends ProcessTableFunction<String> {
        public void eval(@ArgumentHint({ArgumentTrait.TABLE_AS_SET, ArgumentTrait.PASS_COLUMNS_THROUGH}) Row row, Integer num) {
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest$TypedTableAsRowFunction.class */
    public static class TypedTableAsRowFunction extends ProcessTableFunction<String> {
        public void eval(@ArgumentHint({ArgumentTrait.TABLE_AS_ROW}) User user, Integer num) {
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest$TypedTableAsSetFunction.class */
    public static class TypedTableAsSetFunction extends ProcessTableFunction<String> {
        public void eval(@ArgumentHint({ArgumentTrait.TABLE_AS_SET}) User user, Integer num) {
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest$UpdatingArgFunction.class */
    public static class UpdatingArgFunction extends ProcessTableFunction<String> {
        public void eval(@ArgumentHint({ArgumentTrait.TABLE_AS_SET, ArgumentTrait.SUPPORT_UPDATES}) Row row, Integer num) {
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest$User.class */
    public static class User {
        public String s;
        public Integer i;

        public User(String str, Integer num) {
            this.s = str;
            this.i = num;
        }
    }

    @BeforeEach
    void setup() {
        this.util = streamTestUtil(TableConfig.getDefault());
        this.util.tableEnv().executeSql("CREATE VIEW t AS SELECT * FROM (VALUES ('Bob', 12), ('Alice', 42)) AS T(name, score)");
        this.util.tableEnv().executeSql("CREATE VIEW t_name_diff AS SELECT 'Bob' AS name, 12 AS different");
        this.util.tableEnv().executeSql("CREATE VIEW t_type_diff AS SELECT 'Bob' AS name, TRUE AS isValid");
        this.util.tableEnv().executeSql("CREATE VIEW t_updating AS SELECT name, COUNT(*) FROM t GROUP BY name");
        this.util.tableEnv().executeSql("CREATE TABLE t_sink (name STRING, data STRING) WITH ('connector' = 'blackhole')");
    }

    @Test
    void testScalarArgsNoUid() {
        this.util.addTemporarySystemFunction("f", ScalarArgsFunction.class);
        this.util.verifyRelPlan("SELECT * FROM f(i => 1, b => true)");
    }

    @Test
    void testScalarArgsWithUid() {
        this.util.addTemporarySystemFunction("f", ScalarArgsFunction.class);
        this.util.verifyRelPlan("SELECT * FROM f(uid => 'my-uid', i => 1, b => true)");
    }

    @Test
    void testUnknownScalarArg() {
        this.util.addTemporarySystemFunction("f", ScalarArgsFunction.class);
        this.util.verifyRelPlan("SELECT * FROM f(i => 1, b => true, invalid => 'invalid')");
    }

    @Test
    void testTableAsRow() {
        this.util.addTemporarySystemFunction("f", TableAsRowFunction.class);
        this.util.verifyRelPlan("SELECT * FROM f(r => TABLE t, i => 1)");
    }

    @Test
    void testTypedTableAsRow() {
        this.util.addTemporarySystemFunction("f", TypedTableAsRowFunction.class);
        this.util.verifyRelPlan("SELECT * FROM f(u => TABLE t, i => 1)");
    }

    @Test
    void testTypedTableAsRowIgnoringColumnNames() {
        this.util.addTemporarySystemFunction("f", TypedTableAsRowFunction.class);
        this.util.verifyRelPlan("SELECT * FROM f(u => TABLE t_name_diff, i => 1)");
    }

    @Test
    void testTableAsSet() {
        this.util.addTemporarySystemFunction("f", TableAsSetFunction.class);
        this.util.verifyRelPlan("SELECT * FROM f(r => TABLE t PARTITION BY name, i => 1)");
    }

    @Test
    void testTableAsSetOptionalPartitionBy() {
        this.util.addTemporarySystemFunction("f", TableAsSetOptionalPartitionFunction.class);
        this.util.verifyRelPlan("SELECT * FROM f(r => TABLE t, i => 1)");
    }

    @Test
    void testTypedTableAsSet() {
        this.util.addTemporarySystemFunction("f", TypedTableAsSetFunction.class);
        this.util.verifyRelPlan("SELECT * FROM f(u => TABLE t PARTITION BY name, i => 1)");
    }

    @Test
    void testEmptyArgs() {
        this.util.addTemporarySystemFunction("f", EmptyArgFunction.class);
        this.util.verifyRelPlan("SELECT * FROM f(uid => 'my-ptf')");
    }

    @Test
    void testPojoArgs() {
        this.util.addTemporarySystemFunction("f", PojoArgsFunction.class);
        this.util.addTemporarySystemFunction("pojoCreator", PojoCreatingFunction.class);
        this.util.verifyRelPlan("SELECT * FROM f(input => TABLE t, scalar => pojoCreator('Bob', 12), uid => 'my-ptf')");
    }

    @Test
    void testTableAsSetPassThroughColumns() {
        this.util.addTemporarySystemFunction("f", TableAsSetPassThroughFunction.class);
        this.util.verifyRelPlan("SELECT * FROM f(r => TABLE t PARTITION BY name, i => 1)");
    }

    @Test
    void testTableAsRowPassThroughColumns() {
        this.util.addTemporarySystemFunction("f", TableAsRowPassThroughFunction.class);
        this.util.verifyRelPlan("SELECT * FROM f(r => TABLE t, i => 1)");
    }

    @Test
    void testUpdatingInput() {
        this.util.addTemporarySystemFunction("f", UpdatingArgFunction.class);
        this.util.verifyRelPlan("SELECT * FROM f(r => TABLE t_updating PARTITION BY name, i => 1)");
    }

    @Test
    void testMissingUid() {
        this.util.addTemporarySystemFunction("f*", ScalarArgsFunction.class);
        Assertions.assertThatThrownBy(() -> {
            this.util.verifyRelPlan("SELECT * FROM `f*`(42, true)");
        }).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches("Could not derive a unique identifier for process table function 'f*'. The function's name does not qualify for a UID. Please provide a custom identifier using the implicit `uid` argument. For example: myFunction(..., uid => 'my-id')")});
    }

    @Test
    void testUidPipelineSplitIntoTwoFunctions() {
        this.util.addTemporarySystemFunction("f", TableAsSetFunction.class);
        this.util.verifyExecPlan(this.util.tableEnv().createStatementSet().addInsertSql("INSERT INTO t_sink SELECT * FROM f(r => TABLE t PARTITION BY name, i => 1, uid => 'a')").addInsertSql("INSERT INTO t_sink SELECT * FROM f(r => TABLE t PARTITION BY name, i => 1, uid => 'b')"));
    }

    @Test
    void testUidPipelineMergeIntoOneFunction() {
        this.util.addTemporarySystemFunction("f", TableAsSetFunction.class);
        this.util.verifyExecPlan(this.util.tableEnv().createStatementSet().addInsertSql("INSERT INTO t_sink SELECT * FROM f(r => TABLE t PARTITION BY name, i => 1, uid => 'same')").addInsertSql("INSERT INTO t_sink SELECT * FROM f(r => TABLE t PARTITION BY name, i => 1, uid => 'same')"));
    }

    @Test
    void testUidPipelineMergeWithFanOut() {
        this.util.addTemporarySystemFunction("f", TableAsSetFunction.class);
        this.util.verifyExecPlan(this.util.tableEnv().createStatementSet().addInsertSql("INSERT INTO t_sink SELECT * FROM f(r => TABLE t PARTITION BY name, i => 1, uid => 'same') WHERE name = 'Bob'").addInsertSql("INSERT INTO t_sink SELECT * FROM f(r => TABLE t PARTITION BY name, i => 1, uid => 'same') WHERE name = 'Alice'"));
    }

    @MethodSource({"errorSpecs"})
    @ParameterizedTest
    void testErrorBehavior(ErrorSpec errorSpec) {
        this.util.addTemporarySystemFunction("f", errorSpec.functionClass);
        Assertions.assertThatThrownBy(() -> {
            this.util.verifyExecPlan(errorSpec.sql);
        }).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(errorSpec.errorMessage)});
    }

    private static Stream<ErrorSpec> errorSpecs() {
        return Stream.of((Object[]) new ErrorSpec[]{ErrorSpec.of("invalid uid", ScalarArgsFunction.class, "SELECT * FROM f(uid => '%', i => 1, b => true)", "Invalid unique identifier for process table function. The `uid` argument must be a string literal that follows the pattern [a-zA-Z_][a-zA-Z-_0-9]*. But found: %"), ErrorSpec.of("typed table as row with invalid input", TypedTableAsRowFunction.class, "SELECT * FROM f(u => TABLE t_type_diff, i => 1)", "No match found for function signature f(<RecordType(CHAR(3) name, BOOLEAN isValid)>, <NUMERIC>, <CHARACTER>)"), ErrorSpec.of("table as set with missing partition by", TableAsSetFunction.class, "SELECT * FROM f(r => TABLE t, i => 1)", "Table argument 'r' requires a PARTITION BY clause for parallel processing."), ErrorSpec.of("typed table as set with invalid input", TypedTableAsSetFunction.class, "SELECT * FROM f(u => TABLE t_type_diff PARTITION BY name, i => 1)", "No match found for function signature f(<RecordType(CHAR(3) name, BOOLEAN isValid)>, <NUMERIC>, <CHARACTER>)"), ErrorSpec.of("table function instead of process table function", NoProcessTableFunction.class, "SELECT * FROM f(r => TABLE t)", "Only scalar arguments are supported at this location. But argument 'r' declared the following traits: [TABLE, TABLE_AS_ROW]"), ErrorSpec.of("reserved args", ReservedArgFunction.class, "SELECT * FROM f(uid => 'my-ptf')", "Function signature must not declare system arguments. Reserved argument names are: [uid]"), ErrorSpec.of("multiple table args", MultiTableFunction.class, "SELECT * FROM f(r1 => TABLE t, r2 => TABLE t)", "Currently, only signatures with at most one table argument are supported."), ErrorSpec.of("row instead of table", TableAsRowFunction.class, "SELECT * FROM f(r => ROW(42), i => 1)", "Invalid argument value. Argument 'r' expects a table to be passed."), ErrorSpec.of("table as row partition by", TableAsRowFunction.class, "SELECT * FROM f(r => TABLE t PARTITION BY name, i => 1)", "Only tables with set semantics may be partitioned. Invalid PARTITION BY clause in the 0-th operand of table function 'f'"), ErrorSpec.of("invalid partition by clause", TableAsSetFunction.class, "SELECT * FROM f(r => TABLE t PARTITION BY invalid, i => 1)", "Invalid column 'invalid' for PARTITION BY clause. Available columns are: [name, score]"), ErrorSpec.of("unsupported order by", TableAsSetFunction.class, "SELECT * FROM f(r => TABLE t PARTITION BY name ORDER BY score, i => 1)", "ORDER BY clause is currently not supported."), ErrorSpec.of("updates into insert-only table arg", TableAsSetFunction.class, "SELECT * FROM f(r => TABLE t_updating PARTITION BY name, i => 1)", "StreamPhysicalProcessTableFunction doesn't support consuming update changes"), ErrorSpec.of("updates into POJO table arg", InvalidTypedUpdatingArgFunction.class, "SELECT * FROM f(r => TABLE t_updating, i => 1)", "Table arguments that support updates must use a row type."), ErrorSpec.of("uid conflict", TableAsSetFunction.class, "SELECT * FROM f(r => TABLE t PARTITION BY name, i => 42, uid => 'same') UNION ALL SELECT * FROM f(r => TABLE t PARTITION BY name, i => 999, uid => 'same')", "Duplicate unique identifier 'same' detected among process table functions. Make sure that all PTF calls have an identifier defined that is globally unique.")});
    }
}
