package org.apache.flink.table.planner.runtime.stream.table;

import java.util.ArrayList;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
import org.apache.flink.table.planner.utils.TestingTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.CollectionUtil;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/table/DataGeneratorConnectorITCase.class */
class DataGeneratorConnectorITCase extends BatchTestBase {
    private static final String TABLE = "CREATE TABLE datagen_t (\n\tf0 CHAR(1),\n\tf1 VARCHAR(10),\n\tf2 STRING,\n\tf3 BOOLEAN,\n\tf4 DECIMAL(32,2),\n\tf5 TINYINT,\n\tf6 SMALLINT,\n\tf7 INT,\n\tf8 BIGINT,\n\tf9 FLOAT,\n\tf10 DOUBLE,\n\tf11 DATE,\n\tf12 TIME,\n\tf13 TIMESTAMP(3),\n\tf14 TIMESTAMP WITH LOCAL TIME ZONE,\n\tf15 INT ARRAY,\n\tf16 MAP<STRING, DATE>,\n\tf17 DECIMAL(32,2) MULTISET,\n\tf18 ROW<a BIGINT, b TIME, c ROW<d TIMESTAMP>>\n) WITH (\t'connector' = 'datagen',\n\t'number-of-rows' = '10'\n)";

    DataGeneratorConnectorITCase() {
    }

    @Test
    void testTypes() throws Exception {
        tEnv().executeSql(TABLE);
        ArrayList arrayList = new ArrayList();
        CloseableIterator collect = tEnv().executeSql("select * from datagen_t").collect();
        while (collect.hasNext()) {
            try {
                arrayList.add((Row) collect.next());
            } catch (Throwable th) {
                if (collect != null) {
                    try {
                        collect.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        if (collect != null) {
            collect.close();
        }
        Assertions.assertThat(arrayList).as("Unexpected number of results", new Object[0]).hasSize(10);
    }

    @Test
    void testLimitPushDown() {
        TestingTableEnvironment create = TestingTableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build(), null, TableConfig.getDefault());
        create.executeSql("CREATE TABLE datagen_t (\n\tf0 CHAR(1)\n) WITH (\t'connector' = 'datagen')");
        Table sqlQuery = create.sqlQuery("select * from datagen_t limit 5");
        Assertions.assertThat(sqlQuery.explain(new ExplainDetail[0])).contains(new CharSequence[]{"table=[[default_catalog, default_database, datagen_t, limit=[5]]], fields=[f0]"});
        Assertions.assertThat(CollectionUtil.iteratorToList(sqlQuery.execute().collect())).as("Unexpected number of results", new Object[0]).hasSize(5);
    }

    @Test
    void testWithParallelism() {
        TestingTableEnvironment create = TestingTableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build(), null, TableConfig.getDefault());
        create.executeSql("CREATE TABLE datagen_t (\n\tf0 CHAR(1)\n) WITH (\t'connector' = 'datagen',\t'scan.parallelism' = '2')");
        Assertions.assertThat(create.sqlQuery("select * from datagen_t").explain(new ExplainDetail[]{ExplainDetail.JSON_EXECUTION_PLAN})).contains(new CharSequence[]{"table=[[default_catalog, default_database, datagen_t]], fields=[f0])\",\n    \"parallelism\" : 2"});
    }
}
