package org.apache.flink.table.planner.runtime.stream.sql;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.planner.factories.TestProcedureCatalogFactory;
import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/sql/ProcedureITCase.class */
public class ProcedureITCase extends StreamingTestBase {
    @Override // org.apache.flink.table.planner.runtime.utils.StreamingTestBase
    @BeforeEach
    public void before() throws Exception {
        super.before();
        TestProcedureCatalogFactory.CatalogWithBuiltInProcedure catalogWithBuiltInProcedure = new TestProcedureCatalogFactory.CatalogWithBuiltInProcedure("procedure_catalog");
        catalogWithBuiltInProcedure.createDatabase("system", new CatalogDatabaseImpl(Collections.emptyMap(), (String) null), true);
        tEnv().registerCatalog("test_p", catalogWithBuiltInProcedure);
        tEnv().useCatalog("test_p");
    }

    @Test
    void testShowProcedures() {
        Assertions.assertThat(CollectionUtil.iteratorToList(tEnv().executeSql("show procedures").collect())).isEmpty();
        Assertions.assertThatThrownBy(() -> {
            tEnv().executeSql("show procedures in `db1`");
        }).isInstanceOf(TableException.class).hasMessage("Fail to show procedures because the Database `db1` to show from/in does not exist in Catalog `test_p`.");
        Assertions.assertThatThrownBy(() -> {
            tEnv().executeSql("show procedures in default_catalog.default_catalog");
        }).isInstanceOf(UnsupportedOperationException.class).hasMessage("listProcedures is not implemented for class org.apache.flink.table.catalog.GenericInMemoryCatalog.");
        Assertions.assertThat(CollectionUtil.iteratorToList(tEnv().executeSql("show procedures in `system`").collect()).toString()).isEqualTo("[+I[generate_n], +I[generate_user], +I[get_env_conf], +I[get_year], +I[sum_n]]");
        Assertions.assertThat(CollectionUtil.iteratorToList(tEnv().executeSql("show procedures in `system` like 'generate%'").collect()).toString()).isEqualTo("[+I[generate_n], +I[generate_user]]");
        Assertions.assertThat(CollectionUtil.iteratorToList(tEnv().executeSql("show procedures in `system` like 'gEnerate%'").collect())).isEmpty();
        Assertions.assertThat(CollectionUtil.iteratorToList(tEnv().executeSql("show procedures in `system` ilike 'gEnerate%'").collect()).toString()).isEqualTo("[+I[generate_n], +I[generate_user]]");
        Assertions.assertThat(CollectionUtil.iteratorToList(tEnv().executeSql("show procedures in `system` not like 'generate%'").collect()).toString()).isEqualTo("[+I[get_env_conf], +I[get_year], +I[sum_n]]");
        Assertions.assertThat(CollectionUtil.iteratorToList(tEnv().executeSql("show procedures in `system` not ilike 'generaTe%'").collect()).toString()).isEqualTo("[+I[get_env_conf], +I[get_year], +I[sum_n]]");
    }

    @Test
    void testCallProcedure() {
        verifyTableResult(tEnv().executeSql("call `system`.generate_n(4)"), Arrays.asList(Row.of(new Object[]{0}), Row.of(new Object[]{1}), Row.of(new Object[]{2}), Row.of(new Object[]{3})), ResolvedSchema.of(new Column[]{Column.physical("result", DataTypes.BIGINT().notNull().bridgedTo(Long.TYPE))}));
        verifyTableResult(tEnv().executeSql("call `system`.generate_n(4, 'BATCH')"), Arrays.asList(Row.of(new Object[]{0}), Row.of(new Object[]{1}), Row.of(new Object[]{2}), Row.of(new Object[]{3})), ResolvedSchema.of(new Column[]{Column.physical("result", DataTypes.BIGINT().notNull().bridgedTo(Long.TYPE))}));
        Assertions.assertThat((Comparable) tEnv().getConfig().get(ExecutionOptions.RUNTIME_MODE)).isEqualTo(RuntimeExecutionMode.STREAMING);
        verifyTableResult(tEnv().executeSql("call `system`.sum_n(5.5, 1.2, 3.3)"), Collections.singletonList(Row.of(new Object[]{"10.00", 3})), ResolvedSchema.of(new Column[]{Column.physical("sum_value", DataTypes.DECIMAL(10, 2)), Column.physical("count", DataTypes.INT())}));
        verifyTableResult(tEnv().executeSql("call `system`.get_year(timestamp '2023-04-22 00:00:00', timestamp '2024-04-22 00:00:00.300')"), Arrays.asList(Row.of(new Object[]{2023}), Row.of(new Object[]{2024})), ResolvedSchema.of(new Column[]{Column.physical("result", DataTypes.STRING())}));
        verifyTableResult(tEnv().executeSql("call `system`.generate_user('yuxia', 18)"), Collections.singletonList(Row.of(new Object[]{"yuxia", 18})), ResolvedSchema.of(new Column[]{Column.physical("name", DataTypes.STRING()), Column.physical("age", DataTypes.INT().notNull().bridgedTo(Integer.TYPE))}));
    }

    @Test
    void testEnvironmentConf() throws DatabaseAlreadyExistException {
        Configuration configuration = new Configuration();
        configuration.setString("key1", "value1");
        StreamTableEnvironment create = StreamTableEnvironment.create(StreamExecutionEnvironment.getExecutionEnvironment(configuration));
        create.getConfig().set("key2", "value2");
        TestProcedureCatalogFactory.CatalogWithBuiltInProcedure catalogWithBuiltInProcedure = new TestProcedureCatalogFactory.CatalogWithBuiltInProcedure("procedure_catalog");
        catalogWithBuiltInProcedure.createDatabase("system", new CatalogDatabaseImpl(Collections.emptyMap(), (String) null), true);
        create.registerCatalog("test_p", catalogWithBuiltInProcedure);
        create.useCatalog("test_p");
        List iteratorToList = CollectionUtil.iteratorToList(create.executeSql("call `system`.get_env_conf()").collect());
        Assertions.assertThat(iteratorToList.contains(Row.of(new Object[]{"key1", "value1"}))).isTrue();
        Assertions.assertThat(iteratorToList.contains(Row.of(new Object[]{"key2", "value2"}))).isTrue();
        create.getConfig().set("key1", "value11");
        Assertions.assertThat(CollectionUtil.iteratorToList(create.executeSql("call `system`.get_env_conf()").collect()).contains(Row.of(new Object[]{"key1", "value11"}))).isTrue();
    }

    private void verifyTableResult(TableResult tableResult, List<Row> list, ResolvedSchema resolvedSchema) {
        Assertions.assertThat(CollectionUtil.iteratorToList(tableResult.collect()).toString()).isEqualTo(list.toString());
        Assertions.assertThat(tableResult.getResolvedSchema()).isEqualTo(resolvedSchema);
    }
}
