/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.hadoop.utils;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.FileStatusWithBootstrapBaseFile;
import org.apache.hudi.hadoop.HoodieHFileInputFormat;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.LocatedFileStatusWithBootstrapBaseFile;
import org.apache.hudi.hadoop.RealtimeFileStatus;
import org.apache.hudi.hadoop.realtime.HoodieHFileRealtimeInputFormat;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

public class HoodieInputFormatUtils {
    public static final int HOODIE_COMMIT_TIME_COL_POS = 0;
    public static final int HOODIE_RECORD_KEY_COL_POS = 2;
    public static final int HOODIE_PARTITION_PATH_COL_POS = 3;
    public static final String HOODIE_READ_COLUMNS_PROP = "hoodie.read.columns.set";
    private static final Logger LOG = LogManager.getLogger(HoodieInputFormatUtils.class);

    public static FileInputFormat getInputFormat(HoodieFileFormat baseFileFormat, boolean realtime, Configuration conf) {
        switch (baseFileFormat) {
            case PARQUET: {
                if (realtime) {
                    HoodieParquetRealtimeInputFormat inputFormat = new HoodieParquetRealtimeInputFormat();
                    inputFormat.setConf(conf);
                    return inputFormat;
                }
                HoodieParquetInputFormat inputFormat = new HoodieParquetInputFormat();
                inputFormat.setConf(conf);
                return inputFormat;
            }
            case HFILE: {
                if (realtime) {
                    HoodieHFileRealtimeInputFormat inputFormat = new HoodieHFileRealtimeInputFormat();
                    inputFormat.setConf(conf);
                    return inputFormat;
                }
                HoodieHFileInputFormat inputFormat = new HoodieHFileInputFormat();
                inputFormat.setConf(conf);
                return inputFormat;
            }
        }
        throw new HoodieIOException("Hoodie InputFormat not implemented for base file format " + baseFileFormat);
    }

    public static String getInputFormatClassName(HoodieFileFormat baseFileFormat, boolean realtime) {
        switch (baseFileFormat) {
            case PARQUET: {
                if (realtime) {
                    return HoodieParquetRealtimeInputFormat.class.getName();
                }
                return HoodieParquetInputFormat.class.getName();
            }
            case HFILE: {
                if (realtime) {
                    return HoodieHFileRealtimeInputFormat.class.getName();
                }
                return HoodieHFileInputFormat.class.getName();
            }
            case ORC: {
                return OrcInputFormat.class.getName();
            }
        }
        throw new HoodieIOException("Hoodie InputFormat not implemented for base file format " + baseFileFormat);
    }

    public static String getOutputFormatClassName(HoodieFileFormat baseFileFormat) {
        switch (baseFileFormat) {
            case PARQUET: {
                return MapredParquetOutputFormat.class.getName();
            }
            case HFILE: {
                return MapredParquetOutputFormat.class.getName();
            }
            case ORC: {
                return OrcOutputFormat.class.getName();
            }
        }
        throw new HoodieIOException("No OutputFormat for base file format " + baseFileFormat);
    }

    public static String getSerDeClassName(HoodieFileFormat baseFileFormat) {
        switch (baseFileFormat) {
            case PARQUET: {
                return ParquetHiveSerDe.class.getName();
            }
            case HFILE: {
                return ParquetHiveSerDe.class.getName();
            }
            case ORC: {
                return OrcSerde.class.getName();
            }
        }
        throw new HoodieIOException("No SerDe for base file format " + baseFileFormat);
    }

    public static FileInputFormat getInputFormat(String path, boolean realtime, Configuration conf) {
        String extension = FSUtils.getFileExtension((String)path);
        if (extension.equals(HoodieFileFormat.PARQUET.getFileExtension())) {
            return HoodieInputFormatUtils.getInputFormat(HoodieFileFormat.PARQUET, realtime, conf);
        }
        if (extension.equals(HoodieFileFormat.HFILE.getFileExtension())) {
            return HoodieInputFormatUtils.getInputFormat(HoodieFileFormat.HFILE, realtime, conf);
        }
        if (FSUtils.isLogFile((Path)new Path(path)) && realtime) {
            return HoodieInputFormatUtils.getInputFormat(HoodieFileFormat.PARQUET, realtime, conf);
        }
        throw new HoodieIOException("Hoodie InputFormat not implemented for base file of type " + extension);
    }

    public static HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline timeline) {
        HoodieDefaultTimeline commitsAndCompactionTimeline = timeline.getWriteTimeline();
        Option pendingCompactionInstant = commitsAndCompactionTimeline.filterPendingCompactionTimeline().firstInstant();
        if (pendingCompactionInstant.isPresent()) {
            HoodieDefaultTimeline instantsTimeline = commitsAndCompactionTimeline.findInstantsBefore(((HoodieInstant)pendingCompactionInstant.get()).getTimestamp());
            int numCommitsFilteredByCompaction = commitsAndCompactionTimeline.getCommitsTimeline().countInstants() - instantsTimeline.getCommitsTimeline().countInstants();
            LOG.info((Object)("Earliest pending compaction instant is: " + ((HoodieInstant)pendingCompactionInstant.get()).getTimestamp() + " skipping " + numCommitsFilteredByCompaction + " commits"));
            return instantsTimeline;
        }
        return timeline;
    }

    public static Option<String> getAffectedPartitions(List<HoodieInstant> commitsToCheck, HoodieTableMetaClient tableMetaClient, HoodieTimeline timeline, List<Path> inputPaths) throws IOException {
        HashSet partitionsToList = new HashSet();
        for (HoodieInstant commit : commitsToCheck) {
            HoodieCommitMetadata commitMetadata = (HoodieCommitMetadata)HoodieCommitMetadata.fromBytes((byte[])((byte[])timeline.getInstantDetails(commit).get()), HoodieCommitMetadata.class);
            partitionsToList.addAll(commitMetadata.getPartitionToWriteStats().keySet());
        }
        if (partitionsToList.isEmpty()) {
            return Option.empty();
        }
        String incrementalInputPaths = partitionsToList.stream().map(s -> StringUtils.isNullOrEmpty((String)s) ? tableMetaClient.getBasePath() : tableMetaClient.getBasePath() + "/" + s).filter(s -> {
            for (Path path : inputPaths) {
                if (!path.toString().endsWith((String)s)) continue;
                return true;
            }
            return false;
        }).collect(Collectors.joining(","));
        return StringUtils.isNullOrEmpty((String)incrementalInputPaths) ? Option.empty() : Option.of((Object)incrementalInputPaths);
    }

    public static Option<HoodieTimeline> getFilteredCommitsTimeline(Job job, HoodieTableMetaClient tableMetaClient) {
        String tableName = tableMetaClient.getTableConfig().getTableName();
        Object baseTimeline = HoodieHiveUtils.stopAtCompaction((JobContext)job, tableName) ? HoodieInputFormatUtils.filterInstantsTimeline((HoodieDefaultTimeline)tableMetaClient.getActiveTimeline()) : tableMetaClient.getActiveTimeline();
        return Option.of((Object)baseTimeline.getCommitsTimeline().filterCompletedInstants());
    }

    public static Option<List<HoodieInstant>> getCommitsForIncrementalQuery(Job job, String tableName, HoodieTimeline timeline) {
        return Option.of(HoodieInputFormatUtils.getHoodieTimelineForIncrementalQuery(job, tableName, timeline).getInstants().collect(Collectors.toList()));
    }

    public static HoodieTimeline getHoodieTimelineForIncrementalQuery(Job job, String tableName, HoodieTimeline timeline) {
        String lastIncrementalTs = HoodieHiveUtils.readStartCommitTime((JobContext)job, tableName);
        Integer maxCommits = HoodieHiveUtils.readMaxCommits((JobContext)job, tableName);
        LOG.info((Object)("Last Incremental timestamp was set as " + lastIncrementalTs));
        return timeline.findInstantsAfter(lastIncrementalTs, maxCommits.intValue());
    }

    public static Map<Path, HoodieTableMetaClient> getTableMetaClientByPartitionPath(Configuration conf, Set<Path> partitions) {
        HashMap metaClientMap = new HashMap();
        return partitions.stream().collect(Collectors.toMap(Function.identity(), p -> {
            try {
                HoodieTableMetaClient metaClient = HoodieInputFormatUtils.getTableMetaClientForBasePath(p.getFileSystem(conf), p);
                metaClientMap.put(p, metaClient);
                return metaClient;
            }
            catch (IOException e) {
                throw new HoodieIOException("Error creating hoodie meta client against : " + p, e);
            }
        }));
    }

    public static HoodieTableMetaClient getTableMetaClientForBasePath(FileSystem fs, Path dataPath) throws IOException {
        int levels = 3;
        if (HoodiePartitionMetadata.hasPartitionMetadata((FileSystem)fs, (Path)dataPath)) {
            HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(fs, dataPath);
            metadata.readFromFS();
            levels = metadata.getPartitionDepth();
        }
        Path baseDir = HoodieHiveUtils.getNthParent(dataPath, levels);
        LOG.info((Object)("Reading hoodie metadata from path " + baseDir.toString()));
        return HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(baseDir.toString()).build();
    }

    public static FileStatus getFileStatus(HoodieBaseFile baseFile) throws IOException {
        if (baseFile.getBootstrapBaseFile().isPresent()) {
            if (baseFile.getFileStatus() instanceof LocatedFileStatus) {
                return new LocatedFileStatusWithBootstrapBaseFile((LocatedFileStatus)baseFile.getFileStatus(), ((BaseFile)baseFile.getBootstrapBaseFile().get()).getFileStatus());
            }
            return new FileStatusWithBootstrapBaseFile(baseFile.getFileStatus(), ((BaseFile)baseFile.getBootstrapBaseFile().get()).getFileStatus());
        }
        return baseFile.getFileStatus();
    }

    public static List<FileStatus> filterIncrementalFileStatus(Job job, HoodieTableMetaClient tableMetaClient, HoodieTimeline timeline, FileStatus[] fileStatuses, List<HoodieInstant> commitsToCheck) throws IOException {
        HoodieTableFileSystemView roView = new HoodieTableFileSystemView(tableMetaClient, timeline, fileStatuses);
        List commitsList = commitsToCheck.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
        List filteredFiles = roView.getLatestBaseFilesInRange(commitsList).collect(Collectors.toList());
        ArrayList<FileStatus> returns = new ArrayList<FileStatus>();
        for (HoodieBaseFile filteredFile : filteredFiles) {
            LOG.debug((Object)("Processing incremental hoodie file - " + filteredFile.getPath()));
            filteredFile = HoodieInputFormatUtils.refreshFileStatus(job.getConfiguration(), filteredFile);
            returns.add(HoodieInputFormatUtils.getFileStatus(filteredFile));
        }
        LOG.info((Object)("Total paths to process after hoodie incremental filter " + filteredFiles.size()));
        return returns;
    }

    public static Map<HoodieTableMetaClient, List<FileStatus>> groupFileStatusForSnapshotPaths(FileStatus[] fileStatuses, String fileExtension, Collection<HoodieTableMetaClient> metaClientList) {
        HashMap<HoodieTableMetaClient, List<FileStatus>> grouped = new HashMap<HoodieTableMetaClient, List<FileStatus>>();
        HoodieTableMetaClient metadata = null;
        for (FileStatus status : fileStatuses) {
            Path inputPath = status.getPath();
            if (!inputPath.getName().endsWith(fileExtension)) continue;
            if (metadata == null || !inputPath.toString().contains(metadata.getBasePath())) {
                for (HoodieTableMetaClient metaClient : metaClientList) {
                    if (!inputPath.toString().contains(metaClient.getBasePath())) continue;
                    metadata = metaClient;
                    if (grouped.containsKey(metadata)) break;
                    grouped.put(metadata, new ArrayList());
                    break;
                }
            }
            ((List)grouped.get(metadata)).add(status);
        }
        return grouped;
    }

    public static Map<HoodieTableMetaClient, List<Path>> groupSnapshotPathsByMetaClient(Collection<HoodieTableMetaClient> metaClientList, List<Path> snapshotPaths) {
        HashMap<HoodieTableMetaClient, List<Path>> grouped = new HashMap<HoodieTableMetaClient, List<Path>>();
        metaClientList.forEach(metaClient -> {
            List cfr_ignored_0 = grouped.put((HoodieTableMetaClient)metaClient, new ArrayList());
        });
        for (Path path : snapshotPaths) {
            metaClientList.stream().filter(metaClient -> path.toString().contains(metaClient.getBasePath())).forEach(metaClient -> ((List)grouped.get(metaClient)).add(path));
        }
        return grouped;
    }

    public static HoodieMetadataConfig buildMetadataConfig(Configuration conf) {
        return HoodieMetadataConfig.newBuilder().enable(conf.getBoolean(HoodieMetadataConfig.ENABLE.key(), false)).build();
    }

    public static List<FileStatus> filterFileStatusForSnapshotMode(JobConf job, Map<String, HoodieTableMetaClient> tableMetaClientMap, List<Path> snapshotPaths) throws IOException {
        return HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, tableMetaClientMap, snapshotPaths, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static List<FileStatus> filterFileStatusForSnapshotMode(JobConf job, Map<String, HoodieTableMetaClient> tableMetaClientMap, List<Path> snapshotPaths, boolean includeLogFiles) throws IOException {
        HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext((Configuration)job);
        ArrayList<FileStatus> returns = new ArrayList<FileStatus>();
        Map<HoodieTableMetaClient, List<Path>> groupedPaths = HoodieInputFormatUtils.groupSnapshotPathsByMetaClient(tableMetaClientMap.values(), snapshotPaths);
        HashMap<HoodieTableMetaClient, HoodieTableFileSystemView> fsViewCache = new HashMap<HoodieTableMetaClient, HoodieTableFileSystemView>();
        LOG.info((Object)("Found a total of " + groupedPaths.size() + " groups"));
        try {
            for (Map.Entry<HoodieTableMetaClient, List<Path>> entry : groupedPaths.entrySet()) {
                HoodieTableMetaClient metaClient2 = entry.getKey();
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("Hoodie Metadata initialized with completed commit instant as :" + metaClient2));
                }
                HoodieTimeline timeline = HoodieHiveUtils.getTableTimeline(metaClient2.getTableConfig().getTableName(), job, metaClient2);
                HoodieTableFileSystemView fsView2 = fsViewCache.computeIfAbsent(metaClient2, tableMetaClient -> FileSystemViewManager.createInMemoryFileSystemViewWithTimeline((HoodieEngineContext)engineContext, (HoodieTableMetaClient)tableMetaClient, (HoodieMetadataConfig)HoodieInputFormatUtils.buildMetadataConfig((Configuration)job), (HoodieTimeline)timeline));
                ArrayList filteredBaseFiles = new ArrayList();
                HashMap filteredLogs = new HashMap();
                for (Path path : entry.getValue()) {
                    String relativePartitionPath = FSUtils.getRelativePartitionPath((Path)new Path(metaClient2.getBasePath()), (Path)path);
                    List matched = fsView2.getLatestBaseFiles(relativePartitionPath).collect(Collectors.toList());
                    filteredBaseFiles.addAll(matched);
                    if (!includeLogFiles) continue;
                    List<FileSlice> logMatched = fsView2.getLatestFileSlices(relativePartitionPath).filter(f -> !f.getBaseFile().isPresent() && f.getLatestLogFile().isPresent()).collect(Collectors.toList());
                    logMatched.forEach(f -> {
                        List logPathSizePairs = f.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
                        filteredLogs.put(((HoodieLogFile)f.getLatestLogFile().get()).getFileStatus(), logPathSizePairs);
                    });
                }
                LOG.info((Object)("Total paths to process after hoodie filter " + filteredBaseFiles.size()));
                for (HoodieBaseFile hoodieBaseFile : filteredBaseFiles) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug((Object)("Processing latest hoodie file - " + hoodieBaseFile.getPath()));
                    }
                    HoodieBaseFile hoodieBaseFile2 = HoodieInputFormatUtils.refreshFileStatus((Configuration)job, hoodieBaseFile);
                    returns.add(HoodieInputFormatUtils.getFileStatus(hoodieBaseFile2));
                }
                for (Map.Entry entry2 : filteredLogs.entrySet()) {
                    RealtimeFileStatus rs = new RealtimeFileStatus((FileStatus)entry2.getKey());
                    rs.setDeltaLogFiles((List)entry2.getValue());
                    returns.add(rs);
                }
            }
        }
        finally {
            fsViewCache.forEach((metaClient, fsView) -> fsView.close());
        }
        return returns;
    }

    private static HoodieBaseFile refreshFileStatus(Configuration conf, HoodieBaseFile dataFile) {
        Path dataPath = dataFile.getFileStatus().getPath();
        try {
            if (dataFile.getFileSize() == 0L) {
                FileSystem fs = dataPath.getFileSystem(conf);
                LOG.info((Object)("Refreshing file status " + dataFile.getPath()));
                return new HoodieBaseFile(fs.getFileStatus(dataPath), (BaseFile)dataFile.getBootstrapBaseFile().orElse(null));
            }
            return dataFile;
        }
        catch (IOException e) {
            throw new HoodieIOException("Could not get FileStatus on path " + dataPath);
        }
    }

    public static FileStatus[] listAffectedFilesForCommits(Path basePath, List<HoodieCommitMetadata> metadataList) {
        HashMap fullPathToFileStatus = new HashMap();
        for (HoodieCommitMetadata metadata : metadataList) {
            fullPathToFileStatus.putAll(metadata.getFullPathToFileStatus(basePath.toString()));
        }
        return fullPathToFileStatus.values().toArray(new FileStatus[0]);
    }

    public static Set<String> getWritePartitionPaths(List<HoodieCommitMetadata> metadataList) {
        return metadataList.stream().map(HoodieCommitMetadata::getWritePartitionPaths).flatMap(Collection::stream).collect(Collectors.toSet());
    }

    public static HoodieCommitMetadata getCommitMetadata(HoodieInstant instant, HoodieTimeline timeline) throws IOException {
        byte[] data = (byte[])timeline.getInstantDetails(instant).get();
        return (HoodieCommitMetadata)HoodieCommitMetadata.fromBytes((byte[])data, HoodieCommitMetadata.class);
    }
}

