/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.testutils.Assertions;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
import org.apache.hudi.utilities.sources.helpers.gcs.FileDataFetcher;
import org.apache.hudi.utilities.sources.helpers.gcs.FilePathsFetcher;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.verification.VerificationMode;

public class TestGcsEventsHoodieIncrSource
extends SparkClientFunctionalTestHarness {
    @TempDir
    protected Path tempDir;
    @Mock
    FilePathsFetcher filePathsFetcher;
    @Mock
    FileDataFetcher fileDataFetcher;
    protected FilebasedSchemaProvider schemaProvider;
    private HoodieTableMetaClient metaClient;
    private static final Logger LOG = LogManager.getLogger(TestGcsEventsHoodieIncrSource.class);

    @BeforeEach
    public void setUp() throws IOException {
        this.metaClient = this.getHoodieMetaClient(this.hadoopConf(), this.basePath());
        MockitoAnnotations.initMocks((Object)((Object)this));
    }

    public String basePath() {
        return this.tempDir.toAbsolutePath().toUri().toString();
    }

    @Test
    public void shouldNotFindNewDataIfCommitTimeOfWriteAndReadAreEqual() throws IOException {
        String commitTimeForWrites;
        String commitTimeForReads = commitTimeForWrites = "1";
        Pair<String, List<HoodieRecord>> inserts = this.writeGcsMetadataRecords(commitTimeForWrites);
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<String>)Option.of((Object)commitTimeForReads), 0, (String)inserts.getKey());
        ((FilePathsFetcher)Mockito.verify((Object)this.filePathsFetcher, (VerificationMode)Mockito.times((int)0))).getGcsFilePaths((JavaSparkContext)Mockito.any(), (Dataset)Mockito.any(), ArgumentMatchers.anyBoolean());
        ((FileDataFetcher)Mockito.verify((Object)this.fileDataFetcher, (VerificationMode)Mockito.times((int)0))).fetchFileData((SparkSession)Mockito.any(), (List)Mockito.any(), (TypedProperties)Mockito.any());
    }

    @Test
    public void shouldFetchDataIfCommitTimeForReadsLessThanForWrites() throws IOException {
        String commitTimeForWrites = "2";
        String commitTimeForReads = "1";
        Pair<String, List<HoodieRecord>> inserts = this.writeGcsMetadataRecords(commitTimeForWrites);
        List<String> dataFiles = Arrays.asList("data-file-1.json", "data-file-2.json");
        Mockito.when((Object)this.filePathsFetcher.getGcsFilePaths((JavaSparkContext)Mockito.any(), (Dataset)Mockito.any(), ArgumentMatchers.anyBoolean())).thenReturn(dataFiles);
        List<GcsDataRecord> recs = Arrays.asList(new GcsDataRecord("1", "Hello 1"), new GcsDataRecord("2", "Hello 2"), new GcsDataRecord("3", "Hello 3"), new GcsDataRecord("4", "Hello 4"));
        Dataset rows = this.spark().createDataFrame(recs, GcsDataRecord.class);
        Mockito.when((Object)this.fileDataFetcher.fetchFileData((SparkSession)Mockito.any(), (List)ArgumentMatchers.eq(dataFiles), (TypedProperties)Mockito.any())).thenReturn((Object)Option.of((Object)rows));
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<String>)Option.of((Object)commitTimeForReads), 4, (String)inserts.getKey());
        ((FilePathsFetcher)Mockito.verify((Object)this.filePathsFetcher, (VerificationMode)Mockito.times((int)1))).getGcsFilePaths((JavaSparkContext)Mockito.any(), (Dataset)Mockito.any(), ArgumentMatchers.anyBoolean());
        ((FileDataFetcher)Mockito.verify((Object)this.fileDataFetcher, (VerificationMode)Mockito.times((int)1))).fetchFileData((SparkSession)Mockito.any(), (List)ArgumentMatchers.eq(dataFiles), (TypedProperties)Mockito.any());
    }

    private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, Option<String> checkpointToPull, int expectedCount, String expectedCheckpoint) {
        TypedProperties typedProperties = this.setProps(missingCheckpointStrategy);
        GcsEventsHoodieIncrSource incrSource = new GcsEventsHoodieIncrSource(typedProperties, this.jsc(), this.spark(), (SchemaProvider)this.schemaProvider, this.filePathsFetcher, this.fileDataFetcher);
        Pair dataAndCheckpoint = incrSource.fetchNextBatch(checkpointToPull, 100L);
        Option datasetOpt = (Option)dataAndCheckpoint.getLeft();
        String nextCheckPoint = (String)dataAndCheckpoint.getRight();
        org.junit.jupiter.api.Assertions.assertNotNull((Object)nextCheckPoint);
        if (expectedCount == 0) {
            org.junit.jupiter.api.Assertions.assertFalse((boolean)datasetOpt.isPresent());
        } else {
            org.junit.jupiter.api.Assertions.assertEquals((long)((Dataset)datasetOpt.get()).count(), (long)expectedCount);
        }
        org.junit.jupiter.api.Assertions.assertEquals((Object)nextCheckPoint, (Object)expectedCheckpoint);
    }

    private HoodieRecord getGcsMetadataRecord(String commitTime, String filename, String bucketName, String generation) {
        Schema sourceSchema = new MetadataSchemaProvider().getSourceSchema();
        LOG.info((Object)("schema: " + sourceSchema));
        String partitionPath = bucketName;
        String id = "id:" + bucketName + "/" + filename + "/" + generation;
        String mediaLink = String.format("https://storage.googleapis.com/download/storage/v1/b/%s/o/%s?generation=%s&alt=media", bucketName, filename, generation);
        String selfLink = String.format("https://www.googleapis.com/storage/v1/b/%s/o/%s", bucketName, filename);
        GenericData.Record rec = new GenericData.Record(sourceSchema);
        rec.put("_row_key", (Object)id);
        rec.put("partition_path", (Object)bucketName);
        rec.put("timestamp", (Object)Long.parseLong(commitTime));
        rec.put("bucket", (Object)bucketName);
        rec.put("contentLanguage", (Object)"en");
        rec.put("contentType", (Object)"application/octet-stream");
        rec.put("crc32c", (Object)"oRB3Aw==");
        rec.put("etag", (Object)"CP7EwYCu6/kCEAE=");
        rec.put("generation", (Object)generation);
        rec.put("id", (Object)id);
        rec.put("kind", (Object)"storage#object");
        rec.put("md5Hash", (Object)"McsS8FkcDSrB3cGfb18ysA==");
        rec.put("mediaLink", (Object)mediaLink);
        rec.put("metageneration", (Object)"1");
        rec.put("name", (Object)filename);
        rec.put("selfLink", (Object)selfLink);
        rec.put("size", (Object)"370");
        rec.put("storageClass", (Object)"STANDARD");
        rec.put("timeCreated", (Object)"2022-08-29T05:52:55.869Z");
        rec.put("timeStorageClassUpdated", (Object)"2022-08-29T05:52:55.869Z");
        rec.put("updated", (Object)"2022-08-29T05:52:55.869Z");
        HoodieAvroPayload payload = new HoodieAvroPayload(Option.of((Object)rec));
        return new HoodieAvroRecord(new HoodieKey(id, partitionPath), (HoodieRecordPayload)payload);
    }

    private HoodieWriteConfig getWriteConfig() {
        return this.getConfigBuilder(this.basePath(), this.metaClient).withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build()).withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build()).withMetadataConfig(HoodieMetadataConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build()).build();
    }

    private Pair<String, List<HoodieRecord>> writeGcsMetadataRecords(String commitTime) throws IOException {
        HoodieWriteConfig writeConfig = this.getWriteConfig();
        SparkRDDWriteClient writeClient = this.getHoodieWriteClient(writeConfig);
        writeClient.startCommitWithTime(commitTime);
        List<HoodieRecord> gcsMetadataRecords = Arrays.asList(this.getGcsMetadataRecord(commitTime, "data-file-1.json", "bucket-1", "1"), this.getGcsMetadataRecord(commitTime, "data-file-2.json", "bucket-1", "1"), this.getGcsMetadataRecord(commitTime, "data-file-3.json", "bucket-1", "1"), this.getGcsMetadataRecord(commitTime, "data-file-4.json", "bucket-1", "1"));
        JavaRDD result = writeClient.upsert(this.jsc().parallelize(gcsMetadataRecords, 1), commitTime);
        List statuses = result.collect();
        Assertions.assertNoWriteErrors((List)statuses);
        return Pair.of((Object)commitTime, gcsMetadataRecords);
    }

    private TypedProperties setProps(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy) {
        Properties properties = new Properties();
        properties.setProperty("hoodie.deltastreamer.source.hoodieincr.path", this.basePath());
        properties.setProperty("hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy", missingCheckpointStrategy.name());
        properties.setProperty("hoodie.deltastreamer.source.gcsincr.datafile.format", "json");
        return new TypedProperties(properties);
    }

    private HoodieWriteConfig.Builder getConfigBuilder(String basePath, HoodieTableMetaClient metaClient) {
        return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(new MetadataSchemaProvider().getSourceSchema().toString()).withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2).withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION.intValue()).forTable(metaClient.getTableConfig().getTableName());
    }

    public static class GcsDataRecord {
        public String id;
        public String text;

        public GcsDataRecord(String id, String text) {
            this.id = id;
            this.text = text;
        }

        public String getId() {
            return this.id;
        }

        public String getText() {
            return this.text;
        }
    }

    private static class MetadataSchemaProvider
    extends SchemaProvider {
        private final Schema schema = SchemaTestUtil.getSchemaFromResource(TestGcsEventsHoodieIncrSource.class, (String)"/delta-streamer-config/gcs-metadata.avsc", (boolean)true);

        public MetadataSchemaProvider() {
            super(new TypedProperties());
        }

        public Schema getSourceSchema() {
            return this.schema;
        }
    }
}

