package org.apache.hudi.utilities.sources;

import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.HoodieIncrSource;
import org.apache.hudi.utilities.sources.helpers.CloudStoreIngestionConfig;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
import org.apache.hudi.utilities.sources.helpers.gcs.FileDataFetcher;
import org.apache.hudi.utilities.sources.helpers.gcs.FilePathsFetcher;
import org.apache.hudi.utilities.sources.helpers.gcs.QueryInfo;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

/* loaded from: input_file:org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.class */
public class GcsEventsHoodieIncrSource extends HoodieIncrSource {
    private final String srcPath;
    private final boolean checkIfFileExists;
    private final int numInstantsPerFetch;
    private final IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy;
    private final FilePathsFetcher filePathsFetcher;
    private final FileDataFetcher fileDataFetcher;
    private static final Logger LOG = LogManager.getLogger(GcsEventsHoodieIncrSource.class);

    public GcsEventsHoodieIncrSource(TypedProperties typedProperties, JavaSparkContext javaSparkContext, SparkSession sparkSession, SchemaProvider schemaProvider) {
        this(typedProperties, javaSparkContext, sparkSession, schemaProvider, new FilePathsFetcher(typedProperties, getSourceFileFormat(typedProperties)), new FileDataFetcher(typedProperties, typedProperties.getString(CloudStoreIngestionConfig.DATAFILE_FORMAT, "parquet")));
    }

    GcsEventsHoodieIncrSource(TypedProperties typedProperties, JavaSparkContext javaSparkContext, SparkSession sparkSession, SchemaProvider schemaProvider, FilePathsFetcher filePathsFetcher, FileDataFetcher fileDataFetcher) {
        super(typedProperties, javaSparkContext, sparkSession, schemaProvider);
        DataSourceUtils.checkRequiredProperties(typedProperties, Collections.singletonList("hoodie.deltastreamer.source.hoodieincr.path"));
        this.srcPath = typedProperties.getString("hoodie.deltastreamer.source.hoodieincr.path");
        this.missingCheckpointStrategy = IncrSourceHelper.getMissingCheckpointStrategy(typedProperties);
        this.numInstantsPerFetch = typedProperties.getInteger("hoodie.deltastreamer.source.hoodieincr.num_instants", HoodieIncrSource.Config.DEFAULT_NUM_INSTANTS_PER_FETCH.intValue());
        this.checkIfFileExists = typedProperties.getBoolean(CloudStoreIngestionConfig.ENABLE_EXISTS_CHECK, CloudStoreIngestionConfig.DEFAULT_ENABLE_EXISTS_CHECK.booleanValue());
        this.filePathsFetcher = filePathsFetcher;
        this.fileDataFetcher = fileDataFetcher;
        LOG.info("srcPath: " + this.srcPath);
        LOG.info("missingCheckpointStrategy: " + this.missingCheckpointStrategy);
        LOG.info("numInstantsPerFetch: " + this.numInstantsPerFetch);
        LOG.info("checkIfFileExists: " + this.checkIfFileExists);
    }

    @Override // org.apache.hudi.utilities.sources.HoodieIncrSource, org.apache.hudi.utilities.sources.RowSource
    public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> option, long j) {
        QueryInfo queryInfo = getQueryInfo(option);
        if (queryInfo.areStartAndEndInstantsEqual()) {
            LOG.info("Already caught up. Begin Checkpoint was: " + queryInfo.getStartInstant());
            return Pair.of(Option.empty(), queryInfo.getStartInstant());
        }
        Dataset<Row> initializeSourceForFilenames = queryInfo.initializeSourceForFilenames(this.srcPath, this.sparkSession);
        if (!initializeSourceForFilenames.isEmpty()) {
            return extractData(queryInfo, initializeSourceForFilenames);
        }
        LOG.info("Source of file names is empty. Returning empty result and endInstant: " + queryInfo.getEndInstant());
        return Pair.of(Option.empty(), queryInfo.getEndInstant());
    }

    private Pair<Option<Dataset<Row>>, String> extractData(QueryInfo queryInfo, Dataset<Row> dataset) {
        List<String> gcsFilePaths = this.filePathsFetcher.getGcsFilePaths(this.sparkContext, dataset, this.checkIfFileExists);
        LOG.debug("Extracted " + gcsFilePaths.size() + " distinct files. Some samples " + gcsFilePaths.stream().limit(10L).collect(Collectors.toList()));
        return Pair.of(this.fileDataFetcher.fetchFileData(this.sparkSession, gcsFilePaths, this.props), queryInfo.getEndInstant());
    }

    private QueryInfo getQueryInfo(Option<String> option) {
        Pair<String, Pair<String, String>> calculateBeginAndEndInstants = IncrSourceHelper.calculateBeginAndEndInstants(this.sparkContext, this.srcPath, this.numInstantsPerFetch, getBeginInstant(option), this.missingCheckpointStrategy);
        QueryInfo queryInfo = new QueryInfo((String) calculateBeginAndEndInstants.getLeft(), (String) ((Pair) calculateBeginAndEndInstants.getRight()).getLeft(), (String) ((Pair) calculateBeginAndEndInstants.getRight()).getRight());
        if (LOG.isDebugEnabled()) {
            queryInfo.logDetails();
        }
        return queryInfo;
    }

    private Option<String> getBeginInstant(Option<String> option) {
        return (!option.isPresent() || StringUtils.isNullOrEmpty((String) option.get())) ? Option.empty() : option;
    }

    private static String getSourceFileFormat(TypedProperties typedProperties) {
        return typedProperties.getString("hoodie.deltastreamer.source.hoodieincr.file.format", "parquet");
    }
}
