/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.testutils.Assertions;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.HoodieIncrSource;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

public class TestHoodieIncrSource
extends SparkClientFunctionalTestHarness {
    private HoodieTestDataGenerator dataGen;
    private HoodieTableMetaClient metaClient;
    private HoodieTableType tableType = HoodieTableType.COPY_ON_WRITE;

    @BeforeEach
    public void setUp() throws IOException {
        this.dataGen = new HoodieTestDataGenerator();
    }

    public HoodieTableMetaClient getHoodieMetaClient(Configuration hadoopConf, String basePath, Properties props) throws IOException {
        props = HoodieTableMetaClient.withPropertyBuilder().setTableName("raw_trips").setTableType(this.tableType).setPayloadClass(HoodieAvroPayload.class).fromProperties(props).build();
        return HoodieTableMetaClient.initTableAndGetMetaClient((Configuration)hadoopConf, (String)basePath, (Properties)props);
    }

    @ParameterizedTest
    @EnumSource(value=HoodieTableType.class)
    public void testHoodieIncrSource(HoodieTableType tableType) throws IOException {
        this.tableType = tableType;
        this.metaClient = this.getHoodieMetaClient(this.hadoopConf(), this.basePath());
        HoodieWriteConfig writeConfig = this.getConfigBuilder(this.basePath(), this.metaClient).withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build()).withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build()).withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(Boolean.valueOf(true)).withMaxNumDeltaCommitsBeforeCompaction(3).build()).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()).build();
        SparkRDDWriteClient writeClient = this.getHoodieWriteClient(writeConfig);
        Pair<String, List<HoodieRecord>> inserts = this.writeRecords(writeClient, WriteOperationType.INSERT, null, "100");
        Pair<String, List<HoodieRecord>> inserts2 = this.writeRecords(writeClient, WriteOperationType.INSERT, null, "200");
        Pair<String, List<HoodieRecord>> inserts3 = this.writeRecords(writeClient, WriteOperationType.INSERT, null, "300");
        Pair<String, List<HoodieRecord>> inserts4 = this.writeRecords(writeClient, WriteOperationType.INSERT, null, "400");
        Pair<String, List<HoodieRecord>> inserts5 = this.writeRecords(writeClient, WriteOperationType.INSERT, null, "500");
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<String>)Option.empty(), 500, (String)inserts5.getKey());
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<String>)Option.of((Object)"100"), 400, (String)inserts5.getKey());
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<String>)Option.of((Object)"400"), 100, (String)inserts5.getKey());
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, (Option<String>)Option.empty(), 100, (String)inserts5.getKey());
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, (Option<String>)Option.of((Object)inserts5.getKey()), 0, (String)inserts5.getKey());
        Pair<String, List<HoodieRecord>> inserts6 = this.writeRecords(writeClient, WriteOperationType.INSERT, null, "600");
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, (Option<String>)Option.of((Object)inserts5.getKey()), 100, (String)inserts6.getKey());
        writeClient.close();
    }

    @ParameterizedTest
    @EnumSource(value=HoodieTableType.class)
    public void testHoodieIncrSourceInflightCommitBeforeCompletedCommit(HoodieTableType tableType) throws IOException {
        this.tableType = tableType;
        this.metaClient = this.getHoodieMetaClient(this.hadoopConf(), this.basePath());
        HoodieWriteConfig writeConfig = this.getConfigBuilder(this.basePath(), this.metaClient).withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(3, 4).build()).withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(2).build()).withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(Boolean.valueOf(true)).withMaxNumDeltaCommitsBeforeCompaction(3).build()).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()).build();
        SparkRDDWriteClient writeClient = this.getHoodieWriteClient(writeConfig);
        ArrayList<Pair<String, List<HoodieRecord>>> inserts = new ArrayList<Pair<String, List<HoodieRecord>>>();
        for (int i = 0; i < 6; ++i) {
            inserts.add(this.writeRecords(writeClient, WriteOperationType.INSERT, null, HoodieActiveTimeline.createNewInstantTime()));
        }
        HoodieActiveTimeline activeTimeline = this.metaClient.getActiveTimeline();
        HoodieInstant instant4 = (HoodieInstant)activeTimeline.filter(instant -> instant.getTimestamp().equals(((Pair)inserts.get(4)).getKey())).firstInstant().get();
        Option instant4CommitData = activeTimeline.getInstantDetails(instant4);
        activeTimeline.revertToInflight(instant4);
        this.metaClient.reloadActiveTimeline();
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<String>)Option.empty(), 400, (String)((Pair)inserts.get(3)).getKey());
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<String>)Option.of((Object)((Pair)inserts.get(0)).getKey()), 300, (String)((Pair)inserts.get(3)).getKey());
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<String>)Option.of((Object)((Pair)inserts.get(2)).getKey()), 100, (String)((Pair)inserts.get(3)).getKey());
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, (Option<String>)Option.empty(), 100, (String)((Pair)inserts.get(3)).getKey());
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, (Option<String>)Option.of((Object)((Pair)inserts.get(3)).getKey()), 0, (String)((Pair)inserts.get(3)).getKey());
        activeTimeline.reload().saveAsComplete(new HoodieInstant(HoodieInstant.State.INFLIGHT, instant4.getAction(), (String)((Pair)inserts.get(4)).getKey()), instant4CommitData);
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, (Option<String>)Option.of((Object)((Pair)inserts.get(3)).getKey()), 200, (String)((Pair)inserts.get(5)).getKey());
        writeClient.close();
    }

    @ParameterizedTest
    @EnumSource(value=HoodieTableType.class)
    public void testHoodieIncrSourceWithPendingTableServices(HoodieTableType tableType) throws IOException {
        this.tableType = tableType;
        this.metaClient = this.getHoodieMetaClient(this.hadoopConf(), this.basePath());
        HoodieWriteConfig writeConfig = this.getConfigBuilder(this.basePath(), this.metaClient).withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(10, 12).build()).withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(9).build()).withCompactionConfig(HoodieCompactionConfig.newBuilder().withScheduleInlineCompaction(Boolean.valueOf(true)).withMaxNumDeltaCommitsBeforeCompaction(1).build()).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()).build();
        SparkRDDWriteClient writeClient = this.getHoodieWriteClient(writeConfig);
        ArrayList<Pair<String, List<HoodieRecord>>> dataBatches = new ArrayList<Pair<String, List<HoodieRecord>>>();
        for (int i = 0; i < 6; ++i) {
            WriteOperationType opType = i < 4 ? WriteOperationType.BULK_INSERT : WriteOperationType.UPSERT;
            List recordsForUpdate = i < 4 ? null : (List)((Pair)dataBatches.get(3)).getRight();
            dataBatches.add(this.writeRecords(writeClient, opType, recordsForUpdate, HoodieActiveTimeline.createNewInstantTime()));
            if (tableType == HoodieTableType.COPY_ON_WRITE) {
                if (i != 2) continue;
                writeClient.scheduleClustering(Option.empty());
                continue;
            }
            if (tableType != HoodieTableType.MERGE_ON_READ) continue;
            if (i == 4) {
                writeClient.scheduleCompaction(Option.empty());
            }
            if (i != 5) continue;
            writeClient.scheduleClustering(Option.empty());
        }
        dataBatches.add(this.writeRecords(writeClient, WriteOperationType.BULK_INSERT, null, HoodieActiveTimeline.createNewInstantTime()));
        String latestCommitTimestamp = (String)((Pair)dataBatches.get(dataBatches.size() - 1)).getKey();
        Option clusteringInstant = this.metaClient.getActiveTimeline().filterPendingReplaceTimeline().filter(instant -> ClusteringUtils.getClusteringPlan((HoodieTableMetaClient)this.metaClient, (HoodieInstant)instant).isPresent()).firstInstant();
        org.junit.jupiter.api.Assertions.assertTrue((boolean)clusteringInstant.isPresent());
        org.junit.jupiter.api.Assertions.assertTrue((((HoodieInstant)clusteringInstant.get()).getTimestamp().compareTo(latestCommitTimestamp) < 0 ? 1 : 0) != 0);
        if (tableType == HoodieTableType.MERGE_ON_READ) {
            Option compactionInstant = this.metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
            org.junit.jupiter.api.Assertions.assertTrue((boolean)compactionInstant.isPresent());
            org.junit.jupiter.api.Assertions.assertTrue((((HoodieInstant)compactionInstant.get()).getTimestamp().compareTo(latestCommitTimestamp) < 0 ? 1 : 0) != 0);
        }
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<String>)Option.empty(), 500, (String)((Pair)dataBatches.get(6)).getKey());
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<String>)Option.of((Object)((Pair)dataBatches.get(2)).getKey()), 200, (String)((Pair)dataBatches.get(6)).getKey());
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, (Option<String>)Option.empty(), 100, (String)((Pair)dataBatches.get(6)).getKey());
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, (Option<String>)Option.of((Object)((Pair)dataBatches.get(6)).getKey()), 0, (String)((Pair)dataBatches.get(6)).getKey());
        writeClient.close();
    }

    private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, Option<String> checkpointToPull, int expectedCount, String expectedCheckpoint) {
        Properties properties = new Properties();
        properties.setProperty("hoodie.deltastreamer.source.hoodieincr.path", this.basePath());
        properties.setProperty("hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy", missingCheckpointStrategy.name());
        TypedProperties typedProperties = new TypedProperties(properties);
        HoodieIncrSource incrSource = new HoodieIncrSource(typedProperties, this.jsc(), this.spark(), (SchemaProvider)new DummySchemaProvider(HoodieTestDataGenerator.AVRO_SCHEMA));
        Pair batchCheckPoint = incrSource.fetchNextBatch(checkpointToPull, 500L);
        org.junit.jupiter.api.Assertions.assertNotNull((Object)batchCheckPoint.getValue());
        if (expectedCount == 0) {
            org.junit.jupiter.api.Assertions.assertFalse((boolean)((Option)batchCheckPoint.getKey()).isPresent());
        } else {
            org.junit.jupiter.api.Assertions.assertEquals((long)expectedCount, (long)((Dataset)((Option)batchCheckPoint.getKey()).get()).count());
        }
        org.junit.jupiter.api.Assertions.assertEquals((Object)expectedCheckpoint, (Object)batchCheckPoint.getRight());
    }

    private Pair<String, List<HoodieRecord>> writeRecords(SparkRDDWriteClient writeClient, WriteOperationType writeOperationType, List<HoodieRecord> insertRecords, String commit) throws IOException {
        writeClient.startCommitWithTime(commit);
        List records = writeOperationType == WriteOperationType.UPSERT ? this.dataGen.generateUpdates(commit, insertRecords) : this.dataGen.generateInserts(commit, Integer.valueOf(100));
        JavaRDD result = writeOperationType == WriteOperationType.BULK_INSERT ? writeClient.bulkInsert(this.jsc().parallelize(records, 1), commit) : writeClient.upsert(this.jsc().parallelize(records, 1), commit);
        List statuses = result.collect();
        Assertions.assertNoWriteErrors((List)statuses);
        return Pair.of((Object)commit, (Object)records);
    }

    private HoodieWriteConfig.Builder getConfigBuilder(String basePath, HoodieTableMetaClient metaClient) {
        return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2).withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION.intValue()).forTable(metaClient.getTableConfig().getTableName());
    }

    private static class DummySchemaProvider
    extends SchemaProvider {
        private final Schema schema;

        public DummySchemaProvider(Schema schema) {
            super(new TypedProperties());
            this.schema = schema;
        }

        public Schema getSourceSchema() {
            return this.schema;
        }
    }
}

