/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.functional;

import java.io.IOException;
import java.io.Serializable;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.testutils.FunctionalTestHarness;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.utilities.HDFSParquetImporter;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SQLContext;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

@Disabled(value="Disable due to flakiness and feature deprecation.")
@Tag(value="functional")
public class TestHDFSParquetImporter
extends FunctionalTestHarness
implements Serializable {
    private String basePath;
    private transient Path hoodieFolder;
    private transient Path srcFolder;
    private transient List<GenericRecord> insertData;

    @BeforeEach
    public void init() throws IOException, ParseException {
        this.basePath = new Path(this.dfsBasePath(), Thread.currentThread().getStackTrace()[1].getMethodName()).toString();
        this.hoodieFolder = new Path(this.basePath, "testTarget");
        this.srcFolder = new Path(this.basePath, "testSrc");
        this.insertData = this.createInsertRecords(this.srcFolder);
    }

    @AfterEach
    public void clean() throws IOException {
        this.dfs().delete(new Path(this.basePath), true);
    }

    @Test
    public void testImportWithRetries() throws Exception {
        final String schemaFile = new Path(this.basePath, "file.schema").toString();
        HDFSParquetImporter.Config cfg = this.getHDFSParquetImporterConfig(this.srcFolder.toString(), this.hoodieFolder.toString(), "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
        final AtomicInteger retry = new AtomicInteger(3);
        final AtomicInteger fileCreated = new AtomicInteger(0);
        HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg){

            protected int dataImport(JavaSparkContext jsc) throws IOException {
                int ret = super.dataImport(jsc);
                if (retry.decrementAndGet() == 0) {
                    fileCreated.incrementAndGet();
                    TestHDFSParquetImporter.this.createSchemaFile(schemaFile);
                }
                return ret;
            }
        };
        Assertions.assertEquals((int)0, (int)dataImporter.dataImport(this.jsc(), retry.get()));
        Assertions.assertEquals((int)-1, (int)retry.get());
        Assertions.assertEquals((int)1, (int)fileCreated.get());
        boolean isCommitFilePresent = false;
        HashMap<String, Long> recordCounts = new HashMap<String, Long>();
        RemoteIterator hoodieFiles = this.dfs().listFiles(this.hoodieFolder, true);
        while (hoodieFiles.hasNext()) {
            LocatedFileStatus f = (LocatedFileStatus)hoodieFiles.next();
            boolean bl = isCommitFilePresent = isCommitFilePresent || f.getPath().toString().endsWith(".commit");
            if (!f.getPath().toString().endsWith("parquet")) continue;
            String partitionPath = f.getPath().getParent().toString();
            long count = this.sqlContext().read().parquet(f.getPath().toString()).count();
            if (!recordCounts.containsKey(partitionPath)) {
                recordCounts.put(partitionPath, 0L);
            }
            recordCounts.put(partitionPath, (Long)recordCounts.get(partitionPath) + count);
        }
        Assertions.assertTrue((boolean)isCommitFilePresent, (String)"commit file is missing");
        Assertions.assertEquals((int)4, (int)recordCounts.size(), (String)"partition is missing");
        for (Map.Entry e : recordCounts.entrySet()) {
            Assertions.assertEquals((long)24L, (long)((Long)e.getValue()), (String)"missing records");
        }
    }

    private void insert(JavaSparkContext jsc) throws IOException {
        String schemaFile = new Path(this.basePath, "file.schema").toString();
        this.createSchemaFile(schemaFile);
        HDFSParquetImporter.Config cfg = this.getHDFSParquetImporterConfig(this.srcFolder.toString(), this.hoodieFolder.toString(), "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
        HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
        dataImporter.dataImport(jsc, 0);
    }

    @Test
    public void testImportWithInsert() throws IOException, ParseException {
        this.insert(this.jsc());
        Dataset ds = HoodieClientTestUtils.read((JavaSparkContext)this.jsc(), (String)(this.basePath + "/testTarget"), (SQLContext)this.sqlContext(), (FileSystem)this.dfs(), (String[])new String[]{this.basePath + "/testTarget/*/*/*/*"});
        List readData = ds.select("timestamp", new String[]{"_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon"}).collectAsList();
        List result = readData.stream().map(row -> new HoodieTripModel(row.getLong(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4), row.getDouble(5), row.getDouble(6), row.getDouble(7))).collect(Collectors.toList());
        List expected = this.insertData.stream().map(g -> new HoodieTripModel(Long.parseLong(g.get("timestamp").toString()), g.get("_row_key").toString(), g.get("rider").toString(), g.get("driver").toString(), Double.parseDouble(g.get("begin_lat").toString()), Double.parseDouble(g.get("begin_lon").toString()), Double.parseDouble(g.get("end_lat").toString()), Double.parseDouble(g.get("end_lon").toString()))).collect(Collectors.toList());
        Assertions.assertTrue((result.containsAll(expected) && expected.containsAll(result) && result.size() == expected.size() ? 1 : 0) != 0);
    }

    @Test
    public void testImportWithUpsert() throws IOException, ParseException {
        this.insert(this.jsc());
        String schemaFile = new Path(this.basePath, "file.schema").toString();
        Path upsertFolder = new Path(this.basePath, "testUpsertSrc");
        List<GenericRecord> upsertData = this.createUpsertRecords(upsertFolder);
        HDFSParquetImporter.Config cfg = this.getHDFSParquetImporterConfig(upsertFolder.toString(), this.hoodieFolder.toString(), "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
        cfg.command = "upsert";
        HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
        dataImporter.dataImport(this.jsc(), 0);
        List<GenericRecord> expectData = this.insertData.subList(11, 96);
        expectData.addAll(upsertData);
        Dataset ds = HoodieClientTestUtils.read((JavaSparkContext)this.jsc(), (String)(this.basePath + "/testTarget"), (SQLContext)this.sqlContext(), (FileSystem)this.dfs(), (String[])new String[]{this.basePath + "/testTarget/*/*/*/*"});
        List readData = ds.select("timestamp", new String[]{"_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon"}).collectAsList();
        List result = readData.stream().map(row -> new HoodieTripModel(row.getLong(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4), row.getDouble(5), row.getDouble(6), row.getDouble(7))).collect(Collectors.toList());
        List expected = expectData.stream().map(g -> new HoodieTripModel(Long.parseLong(g.get("timestamp").toString()), g.get("_row_key").toString(), g.get("rider").toString(), g.get("driver").toString(), Double.parseDouble(g.get("begin_lat").toString()), Double.parseDouble(g.get("begin_lon").toString()), Double.parseDouble(g.get("end_lat").toString()), Double.parseDouble(g.get("end_lon").toString()))).collect(Collectors.toList());
        Assertions.assertTrue((result.containsAll(expected) && expected.containsAll(result) && result.size() == expected.size() ? 1 : 0) != 0);
    }

    public List<GenericRecord> createInsertRecords(Path srcFolder) throws ParseException, IOException {
        Path srcFile = new Path(srcFolder.toString(), "file1.parquet");
        long startTime = HoodieActiveTimeline.parseDateFromInstantTime((String)"20170203000000").getTime() / 1000L;
        ArrayList<GenericRecord> records = new ArrayList<GenericRecord>();
        for (long recordNum = 0L; recordNum < 96L; ++recordNum) {
            records.add(new HoodieTestDataGenerator().generateGenericRecord(Long.toString(recordNum), "0", "rider-" + recordNum, "driver-" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum)));
        }
        try (ParquetWriter writer = ((AvroParquetWriter.Builder)AvroParquetWriter.builder((Path)srcFile).withSchema(HoodieTestDataGenerator.AVRO_SCHEMA).withConf(HoodieTestUtils.getDefaultHadoopConf())).build();){
            for (GenericRecord record : records) {
                writer.write((Object)record);
            }
        }
        return records;
    }

    public List<GenericRecord> createUpsertRecords(Path srcFolder) throws ParseException, IOException {
        long recordNum;
        Path srcFile = new Path(srcFolder.toString(), "file1.parquet");
        long startTime = HoodieActiveTimeline.parseDateFromInstantTime((String)"20170203000000").getTime() / 1000L;
        ArrayList<GenericRecord> records = new ArrayList<GenericRecord>();
        HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
        for (recordNum = 0L; recordNum < 11L; ++recordNum) {
            records.add(dataGen.generateGenericRecord(Long.toString(recordNum), "0", "rider-upsert-" + recordNum, "driver-upsert" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum)));
        }
        for (recordNum = 96L; recordNum < 100L; ++recordNum) {
            records.add(dataGen.generateGenericRecord(Long.toString(recordNum), "0", "rider-upsert-" + recordNum, "driver-upsert" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum)));
        }
        try (ParquetWriter writer = ((AvroParquetWriter.Builder)AvroParquetWriter.builder((Path)srcFile).withSchema(HoodieTestDataGenerator.AVRO_SCHEMA).withConf(HoodieTestUtils.getDefaultHadoopConf())).build();){
            for (GenericRecord record : records) {
                writer.write((Object)record);
            }
        }
        return records;
    }

    private void createSchemaFile(String schemaFile) throws IOException {
        FSDataOutputStream schemaFileOS = this.dfs().create(new Path(schemaFile));
        schemaFileOS.write("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}".getBytes());
        schemaFileOS.close();
    }

    @Test
    public void testSchemaFile() throws Exception {
        Path hoodieFolder = new Path(this.basePath, "testTarget");
        Path srcFolder = new Path(this.basePath.toString(), "srcTest");
        Path schemaFile = new Path(this.basePath.toString(), "missingFile.schema");
        HDFSParquetImporter.Config cfg = this.getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(), "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile.toString());
        HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
        Assertions.assertEquals((int)-1, (int)dataImporter.dataImport(this.jsc(), 0));
        this.dfs().create(schemaFile).write("Random invalid schema data".getBytes());
        Assertions.assertEquals((int)-1, (int)dataImporter.dataImport(this.jsc(), 0));
    }

    @Test
    public void testRowAndPartitionKey() throws Exception {
        Path schemaFile = new Path(this.basePath.toString(), "missingFile.schema");
        this.createSchemaFile(schemaFile.toString());
        HDFSParquetImporter.Config cfg = this.getHDFSParquetImporterConfig(this.srcFolder.toString(), this.hoodieFolder.toString(), "testTable", "COPY_ON_WRITE", "invalidRowKey", "timestamp", 1, schemaFile.toString());
        HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
        Assertions.assertEquals((int)-1, (int)dataImporter.dataImport(this.jsc(), 0));
        cfg = this.getHDFSParquetImporterConfig(this.srcFolder.toString(), this.hoodieFolder.toString(), "testTable", "COPY_ON_WRITE", "_row_key", "invalidTimeStamp", 1, schemaFile.toString());
        dataImporter = new HDFSParquetImporter(cfg);
        Assertions.assertEquals((int)-1, (int)dataImporter.dataImport(this.jsc(), 0));
    }

    public HDFSParquetImporter.Config getHDFSParquetImporterConfig(String srcPath, String targetPath, String tableName, String tableType, String rowKey, String partitionKey, int parallelism, String schemaFile) {
        HDFSParquetImporter.Config cfg = new HDFSParquetImporter.Config();
        cfg.srcPath = srcPath;
        cfg.targetPath = targetPath;
        cfg.tableName = tableName;
        cfg.tableType = tableType;
        cfg.rowKey = rowKey;
        cfg.partitionKey = partitionKey;
        cfg.parallelism = parallelism;
        cfg.schemaFile = schemaFile;
        return cfg;
    }

    public static class HoodieTripModel {
        long timestamp;
        String rowKey;
        String rider;
        String driver;
        double beginLat;
        double beginLon;
        double endLat;
        double endLon;

        public HoodieTripModel(long timestamp, String rowKey, String rider, String driver, double beginLat, double beginLon, double endLat, double endLon) {
            this.timestamp = timestamp;
            this.rowKey = rowKey;
            this.rider = rider;
            this.driver = driver;
            this.beginLat = beginLat;
            this.beginLon = beginLon;
            this.endLat = endLat;
            this.endLon = endLon;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            HoodieTripModel other = (HoodieTripModel)o;
            return this.timestamp == other.timestamp && this.rowKey.equals(other.rowKey) && this.rider.equals(other.rider) && this.driver.equals(other.driver) && this.beginLat == other.beginLat && this.beginLon == other.beginLon && this.endLat == other.endLat && this.endLon == other.endLon;
        }

        public int hashCode() {
            return Objects.hash(this.timestamp, this.rowKey, this.rider, this.driver, this.beginLat, this.beginLon, this.endLat, this.endLon);
        }
    }
}

