/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.orc;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.core.fs.Path;
import org.apache.flink.orc.vector.RowDataVectorizer;
import org.apache.flink.orc.vector.Vectorizer;
import org.apache.flink.orc.writer.OrcBulkWriterFactory;
import org.apache.flink.streaming.api.functions.sink.filesystem.legacy.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.data.binary.BinaryStringData;
import org.apache.flink.table.planner.runtime.batch.sql.BatchFileSystemITCaseBase;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;

@ExtendWith(value={ParameterizedTestExtension.class})
public class OrcFileSystemITCase
extends BatchFileSystemITCaseBase {
    @TempDir
    public static java.nio.file.Path temporaryFolder;
    @Parameter
    public boolean configure;

    @Parameters(name="configure={0}")
    public static Collection<Boolean> parameters() {
        return Arrays.asList(false, true);
    }

    public String[] formatProperties() {
        ArrayList<String> ret = new ArrayList<String>();
        ret.add("'format'='orc'");
        if (this.configure) {
            ret.add("'orc.compress'='snappy'");
        }
        return ret.toArray(new String[0]);
    }

    @TestTemplate
    public void testNonPartition() {
        super.testNonPartition();
        File directory = new File(URI.create(this.resultPath()).getPath());
        Object[] files = directory.listFiles((dir, name) -> !name.startsWith(".") && !name.startsWith("_"));
        Assertions.assertThat((Object[])files).isNotNull();
        org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(URI.create(((File)files[0]).getAbsolutePath()));
        try {
            Reader reader = OrcFile.createReader((org.apache.hadoop.fs.Path)path, (OrcFile.ReaderOptions)OrcFile.readerOptions((Configuration)new Configuration()));
            if (this.configure) {
                Assertions.assertThat((String)reader.getCompressionKind().toString()).isEqualTo("SNAPPY");
            } else {
                Assertions.assertThat((String)reader.getCompressionKind().toString()).isEqualTo("ZLIB");
            }
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @BeforeEach
    public void before() {
        super.before();
        super.tableEnv().executeSql(String.format("create table orcFilterTable (x string,y int,a int,b bigint,c boolean,d string,e decimal(8,4),f date,g timestamp) with ('connector' = 'filesystem','path' = '%s',%s)", super.resultPath(), String.join((CharSequence)",\n", this.formatProperties())));
        super.tableEnv().executeSql(String.format("create table orcLimitTable (x string,y int,a int) with ('connector' = 'filesystem','path' = '%s',%s)", super.resultPath(), String.join((CharSequence)",\n", this.formatProperties())));
    }

    @TestTemplate
    void testOrcFilterPushDown() throws ExecutionException, InterruptedException {
        super.tableEnv().executeSql("insert into orcFilterTable select x, y, a, b, case when y >= 10 then false else true end as c, case when a = 1 then null else x end as d, y * 3.14 as e, date '2020-01-01' as f, timestamp '2020-01-01 05:20:00' as g from originalT").await();
        this.check("select x, y from orcFilterTable where x = 'x11' and 11 = y", Collections.singletonList(Row.of((Object[])new Object[]{"x11", "11"})));
        this.check("select x, y from orcFilterTable where 4 <= y and y < 8 and x <> 'x6'", Arrays.asList(Row.of((Object[])new Object[]{"x4", "4"}), Row.of((Object[])new Object[]{"x5", "5"}), Row.of((Object[])new Object[]{"x7", "7"})));
        this.check("select x, y from orcFilterTable where x = 'x1' and not y >= 3", Collections.singletonList(Row.of((Object[])new Object[]{"x1", "1"})));
        this.check("select x, y from orcFilterTable where c and y > 2 and y < 4", Collections.singletonList(Row.of((Object[])new Object[]{"x3", "3"})));
        this.check("select x, y from orcFilterTable where d is null and x = 'x5'", Collections.singletonList(Row.of((Object[])new Object[]{"x5", "5"})));
        this.check("select x, y from orcFilterTable where d is not null and y > 25", Arrays.asList(Row.of((Object[])new Object[]{"x26", "26"}), Row.of((Object[])new Object[]{"x27", "27"})));
        this.check("select x, y from orcFilterTable where (d is not null and y > 26) or (d is null and x = 'x3')", Arrays.asList(Row.of((Object[])new Object[]{"x3", "3"}), Row.of((Object[])new Object[]{"x27", "27"})));
        this.check("select x, y from orcFilterTable where e = 3.1400 or x = 'x10'", Arrays.asList(Row.of((Object[])new Object[]{"x1", "1"}), Row.of((Object[])new Object[]{"x10", "10"})));
        this.check("select x, y from orcFilterTable where f = date '2020-01-01' and x = 'x1'", Collections.singletonList(Row.of((Object[])new Object[]{"x1", "1"})));
        this.check("select x, y from orcFilterTable where g = timestamp '2020-01-01 05:20:00' and x = 'x10'", Collections.singletonList(Row.of((Object[])new Object[]{"x10", "10"})));
    }

    @TestTemplate
    void testOrcFilterPushDownLiteralFirst() throws ExecutionException, InterruptedException {
        super.tableEnv().executeSql("insert into orcLimitTable values('a', 10, 10), ('b', 11, 11)").await();
        this.check("select y from orcLimitTable where 10 >= y", Collections.singletonList(Row.of((Object[])new Object[]{10})));
        this.check("select y from orcLimitTable where 11 <= y", Collections.singletonList(Row.of((Object[])new Object[]{11})));
        this.check("select y from orcLimitTable where 11 > y", Collections.singletonList(Row.of((Object[])new Object[]{10})));
        this.check("select y from orcLimitTable where 10 < y", Collections.singletonList(Row.of((Object[])new Object[]{11})));
    }

    @TestTemplate
    void testNestedTypes() throws Exception {
        String path = this.initNestedTypesFile(this.initNestedTypesData());
        super.tableEnv().executeSql(String.format("create table orcNestedTypesTable (_col0 string,_col1 int,_col2 ARRAY<ROW<_col2_col0 string>>,_col3 MAP<string,ROW<_col3_col0 string,_col3_col1 timestamp>>) with ('connector' = 'filesystem','format' = 'orc','path' = '%s')", path));
        TableResult tableResult = super.tableEnv().executeSql("SELECT * FROM orcNestedTypesTable");
        List rows = CollectionUtil.iteratorToList((Iterator)tableResult.collect());
        Assertions.assertThat((List)rows).hasSize(4);
        Assertions.assertThat((String)((Row)rows.get(0)).toString()).isEqualTo("+I[_col_0_string_1, 1, [+I[_col_2_row_0_string_1], +I[_col_2_row_1_string_1]], {_col_3_map_key_1=+I[_col_3_map_value_string_1, " + new Timestamp(3600000L).toLocalDateTime() + "]}]");
        Assertions.assertThat((String)((Row)rows.get(1)).toString()).isEqualTo("+I[_col_0_string_2, 2, null, null]");
        Assertions.assertThat((String)((Row)rows.get(2)).toString()).isEqualTo("+I[_col_0_string_3, 3, [], {}]");
        Assertions.assertThat((String)((Row)rows.get(3)).toString()).isEqualTo("+I[_col_0_string_4, 4, [], {null=null}]");
    }

    private List<RowData> initNestedTypesData() {
        ArrayList<RowData> data = new ArrayList<RowData>(3);
        GenericRowData rowData = new GenericRowData(4);
        rowData.setField(0, (Object)new BinaryStringData("_col_0_string_1"));
        rowData.setField(1, (Object)1);
        GenericRowData arrayValue1 = new GenericRowData(1);
        arrayValue1.setField(0, (Object)new BinaryStringData("_col_2_row_0_string_1"));
        GenericRowData arrayValue2 = new GenericRowData(1);
        arrayValue2.setField(0, (Object)new BinaryStringData("_col_2_row_1_string_1"));
        GenericArrayData arrayData = new GenericArrayData(new Object[]{arrayValue1, arrayValue2});
        rowData.setField(2, (Object)arrayData);
        GenericRowData mapValue1 = new GenericRowData(2);
        mapValue1.setField(0, (Object)new BinaryStringData("_col_3_map_value_string_1"));
        mapValue1.setField(1, (Object)TimestampData.fromTimestamp((Timestamp)new Timestamp(3600000L)));
        HashMap<BinaryStringData, GenericRowData> mapDataMap = new HashMap<BinaryStringData, GenericRowData>();
        mapDataMap.put(new BinaryStringData("_col_3_map_key_1"), mapValue1);
        GenericMapData mapData = new GenericMapData(mapDataMap);
        rowData.setField(3, (Object)mapData);
        data.add((RowData)rowData);
        rowData = new GenericRowData(4);
        rowData.setField(0, (Object)new BinaryStringData("_col_0_string_2"));
        rowData.setField(1, (Object)2);
        rowData.setField(2, null);
        rowData.setField(3, null);
        data.add((RowData)rowData);
        rowData = new GenericRowData(4);
        rowData.setField(0, (Object)new BinaryStringData("_col_0_string_3"));
        rowData.setField(1, (Object)3);
        rowData.setField(2, (Object)new GenericArrayData(new Object[0]));
        rowData.setField(3, (Object)new GenericMapData(new HashMap()));
        data.add((RowData)rowData);
        rowData = new GenericRowData(4);
        rowData.setField(0, (Object)new BinaryStringData("_col_0_string_4"));
        rowData.setField(1, (Object)4);
        rowData.setField(2, (Object)new GenericArrayData(new Object[0]));
        HashMap<Object, Object> mapDataMap2 = new HashMap<Object, Object>();
        mapDataMap2.put(null, null);
        rowData.setField(3, (Object)new GenericMapData(mapDataMap2));
        data.add((RowData)rowData);
        return data;
    }

    private String initNestedTypesFile(List<RowData> data) throws Exception {
        LogicalType[] fieldTypes = new LogicalType[4];
        fieldTypes[0] = new VarCharType();
        fieldTypes[1] = new IntType();
        List<RowType.RowField> arrayRowFieldList = Collections.singletonList(new RowType.RowField("_col2_col0", (LogicalType)new VarCharType()));
        fieldTypes[2] = new ArrayType((LogicalType)new RowType(arrayRowFieldList));
        List<RowType.RowField> mapRowFieldList = Arrays.asList(new RowType.RowField("_col3_col0", (LogicalType)new VarCharType()), new RowType.RowField("_col3_col1", (LogicalType)new TimestampType()));
        fieldTypes[3] = new MapType((LogicalType)new VarCharType(), (LogicalType)new RowType(mapRowFieldList));
        String schema = "struct<_col0:string,_col1:int,_col2:array<struct<_col2_col0:string>>,_col3:map<string,struct<_col3_col0:string,_col3_col1:timestamp>>>";
        File outDir = TempDirUtils.newFolder((java.nio.file.Path)temporaryFolder);
        Properties writerProps = new Properties();
        writerProps.setProperty("orc.compress", "LZ4");
        OrcBulkWriterFactory writer = new OrcBulkWriterFactory((Vectorizer)new RowDataVectorizer(schema, fieldTypes), writerProps, new Configuration());
        StreamingFileSink sink = ((StreamingFileSink.DefaultBulkFormatBuilder)StreamingFileSink.forBulkFormat((Path)new Path(outDir.toURI()), (BulkWriter.Factory)writer).withBucketCheckInterval(10000L)).build();
        try (OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)new StreamSink((SinkFunction)sink), 1, 1, 0);){
            testHarness.setup();
            testHarness.open();
            int time = 0;
            for (RowData record : data) {
                testHarness.processElement((Object)record, (long)(++time));
            }
            testHarness.snapshot(1L, (long)(++time));
            testHarness.notifyOfCompletedCheckpoint(1L);
        }
        return outDir.getAbsolutePath();
    }

    @TestTemplate
    void testLimitableBulkFormat() throws ExecutionException, InterruptedException {
        super.tableEnv().executeSql("insert into orcLimitTable select x, y, 1 as a from originalT").await();
        TableResult tableResult1 = super.tableEnv().executeSql("SELECT * FROM orcLimitTable limit 5");
        List rows1 = CollectionUtil.iteratorToList((Iterator)tableResult1.collect());
        Assertions.assertThat((List)rows1).hasSize(5);
        this.check("select a from orcLimitTable limit 5", Arrays.asList(Row.of((Object[])new Object[]{1}), Row.of((Object[])new Object[]{1}), Row.of((Object[])new Object[]{1}), Row.of((Object[])new Object[]{1}), Row.of((Object[])new Object[]{1})));
    }
}

