/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.hadoop.realtime;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.BootstrapBaseFileSplit;
import org.apache.hudi.hadoop.FileStatusWithBootstrapBaseFile;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.LocatedFileStatusWithBootstrapBaseFile;
import org.apache.hudi.hadoop.PathWithLogFilePath;
import org.apache.hudi.hadoop.RealtimeFileStatus;
import org.apache.hudi.hadoop.UseFileSplitsFromInputFormat;
import org.apache.hudi.hadoop.UseRecordReaderFromInputFormat;
import org.apache.hudi.hadoop.realtime.HoodieEmptyRecordReader;
import org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader;
import org.apache.hudi.hadoop.realtime.RealtimeSplit;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

@UseRecordReaderFromInputFormat
@UseFileSplitsFromInputFormat
public class HoodieParquetRealtimeInputFormat
extends HoodieParquetInputFormat
implements Configurable {
    private static final Logger LOG = LogManager.getLogger(HoodieParquetRealtimeInputFormat.class);

    public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
        List<FileSplit> fileSplits = Arrays.stream(super.getSplits(job, numSplits)).map(is -> (FileSplit)is).collect(Collectors.toList());
        boolean isIncrementalSplits = HoodieRealtimeInputFormatUtils.isIncrementalQuerySplits(fileSplits);
        return isIncrementalSplits ? HoodieRealtimeInputFormatUtils.getIncrementalRealtimeSplits((Configuration)job, fileSplits.stream()) : HoodieRealtimeInputFormatUtils.getRealtimeSplits((Configuration)job, fileSplits.stream());
    }

    @Override
    protected List<FileStatus> listStatusForIncrementalMode(JobConf job, HoodieTableMetaClient tableMetaClient, List<Path> inputPaths) throws IOException {
        ArrayList<FileStatus> result = new ArrayList<FileStatus>();
        String tableName = tableMetaClient.getTableConfig().getTableName();
        Job jobContext = Job.getInstance((Configuration)job);
        Option<HoodieTimeline> timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
        if (!timeline.isPresent()) {
            return result;
        }
        HoodieTimeline commitsTimelineToReturn = HoodieInputFormatUtils.getHoodieTimelineForIncrementalQuery(jobContext, tableName, (HoodieTimeline)timeline.get());
        Option commitsToCheck = Option.of(commitsTimelineToReturn.getInstants().collect(Collectors.toList()));
        if (!commitsToCheck.isPresent()) {
            return result;
        }
        ((List)commitsToCheck.get()).sort(HoodieInstant::compareTo);
        List<HoodieCommitMetadata> metadataList = ((List)commitsToCheck.get()).stream().map(instant -> {
            try {
                return HoodieInputFormatUtils.getCommitMetadata(instant, commitsTimelineToReturn);
            }
            catch (IOException e) {
                throw new HoodieException(String.format("cannot get metadata for instant: %s", instant));
            }
        }).collect(Collectors.toList());
        List<FileStatus> affectedFileStatus = Arrays.asList(HoodieInputFormatUtils.listAffectedFilesForCommits(new Path(tableMetaClient.getBasePath()), metadataList));
        HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(tableMetaClient, commitsTimelineToReturn, affectedFileStatus.toArray(new FileStatus[0]));
        Path basePath = new Path(tableMetaClient.getBasePath());
        List affectedPartition = HoodieInputFormatUtils.getWritePartitionPaths(metadataList).stream().filter(k -> k.isEmpty() ? inputPaths.contains(basePath) : inputPaths.contains(new Path(basePath, k))).collect(Collectors.toList());
        if (affectedPartition.isEmpty()) {
            return result;
        }
        List<HoodieFileGroup> fileGroups = affectedPartition.stream().flatMap(partitionPath -> fsView.getAllFileGroups(partitionPath)).collect(Collectors.toList());
        HoodieParquetRealtimeInputFormat.setInputPaths((JobConf)job, (String)affectedPartition.stream().map(p -> p.isEmpty() ? basePath.toString() : new Path(basePath, p).toString()).collect(Collectors.joining(",")));
        FileStatus[] fileStatuses = this.getStatus(job);
        HashMap<String, FileStatus> candidateFileStatus = new HashMap<String, FileStatus>();
        for (int i = 0; i < fileStatuses.length; ++i) {
            String key = fileStatuses[i].getPath().toString();
            candidateFileStatus.put(key, fileStatuses[i]);
        }
        String maxCommitTime = ((HoodieInstant)fsView.getLastInstant().get()).getTimestamp();
        result.addAll(this.collectAllIncrementalFiles(fileGroups, maxCommitTime, basePath.toString(), candidateFileStatus));
        return result;
    }

    private List<FileStatus> collectAllIncrementalFiles(List<HoodieFileGroup> fileGroups, String maxCommitTime, String basePath, Map<String, FileStatus> candidateFileStatus) {
        ArrayList<FileStatus> result = new ArrayList<FileStatus>();
        fileGroups.stream().forEach(f -> {
            try {
                List logFileStatus;
                List baseFiles = f.getAllFileSlices().filter(slice -> slice.getBaseFile().isPresent()).collect(Collectors.toList());
                if (!baseFiles.isEmpty()) {
                    FileStatus baseFileStatus = HoodieInputFormatUtils.getFileStatus((HoodieBaseFile)((FileSlice)baseFiles.get(0)).getBaseFile().get());
                    String baseFilePath = baseFileStatus.getPath().toUri().toString();
                    if (!candidateFileStatus.containsKey(baseFilePath)) {
                        throw new HoodieException("Error obtaining fileStatus for file: " + baseFilePath);
                    }
                    RealtimeFileStatus fileStatus = new RealtimeFileStatus((FileStatus)candidateFileStatus.get(baseFilePath));
                    fileStatus.setMaxCommitTime(maxCommitTime);
                    fileStatus.setBelongToIncrementalFileStatus(true);
                    fileStatus.setBasePath(basePath);
                    fileStatus.setBaseFilePath(baseFilePath);
                    fileStatus.setDeltaLogFiles(((FileSlice)f.getLatestFileSlice().get()).getLogFiles().collect(Collectors.toList()));
                    if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile || baseFileStatus instanceof FileStatusWithBootstrapBaseFile) {
                        fileStatus.setBootStrapFileStatus(baseFileStatus);
                    }
                    result.add(fileStatus);
                }
                if (f.getLatestFileSlice().isPresent() && baseFiles.isEmpty() && (logFileStatus = ((FileSlice)f.getLatestFileSlice().get()).getLogFiles().map(logFile -> logFile.getFileStatus()).collect(Collectors.toList())).size() > 0) {
                    RealtimeFileStatus fileStatus = new RealtimeFileStatus((FileStatus)logFileStatus.get(0));
                    fileStatus.setBelongToIncrementalFileStatus(true);
                    fileStatus.setDeltaLogFiles(logFileStatus.stream().map(l -> new HoodieLogFile(l.getPath(), Long.valueOf(l.getLen()))).collect(Collectors.toList()));
                    fileStatus.setMaxCommitTime(maxCommitTime);
                    fileStatus.setBasePath(basePath);
                    result.add(fileStatus);
                }
            }
            catch (IOException e) {
                throw new HoodieException("Error obtaining data file/log file grouping ", (Throwable)e);
            }
        });
        return result;
    }

    @Override
    protected boolean includeLogFilesForSnapShotView() {
        return true;
    }

    @Override
    protected boolean isSplitable(FileSystem fs, Path filename) {
        if (filename instanceof PathWithLogFilePath) {
            return ((PathWithLogFilePath)filename).splitable();
        }
        return super.isSplitable(fs, filename);
    }

    @Override
    protected FileSplit makeSplit(Path file, long start, long length, String[] hosts) {
        if (file instanceof PathWithLogFilePath) {
            return this.doMakeSplitForPathWithLogFilePath((PathWithLogFilePath)file, start, length, hosts, null);
        }
        return super.makeSplit(file, start, length, hosts);
    }

    @Override
    protected FileSplit makeSplit(Path file, long start, long length, String[] hosts, String[] inMemoryHosts) {
        if (file instanceof PathWithLogFilePath) {
            return this.doMakeSplitForPathWithLogFilePath((PathWithLogFilePath)file, start, length, hosts, inMemoryHosts);
        }
        return super.makeSplit(file, start, length, hosts, inMemoryHosts);
    }

    private FileSplit doMakeSplitForPathWithLogFilePath(PathWithLogFilePath path, long start, long length, String[] hosts, String[] inMemoryHosts) {
        if (!path.includeBootstrapFilePath()) {
            return path.buildSplit(path, start, length, hosts);
        }
        FileSplit bf = inMemoryHosts == null ? super.makeSplit(path.getPathWithBootstrapFileStatus(), start, length, hosts) : super.makeSplit(path.getPathWithBootstrapFileStatus(), start, length, hosts, inMemoryHosts);
        return HoodieRealtimeInputFormatUtils.createRealtimeBoostrapBaseFileSplit((BootstrapBaseFileSplit)bf, path.getBasePath(), path.getDeltaLogFiles(), path.getMaxCommitTime());
    }

    @Override
    public FileStatus[] listStatus(JobConf job) throws IOException {
        return super.listStatus(job);
    }

    @Override
    protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline timeline) {
        return timeline;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void addProjectionToJobConf(RealtimeSplit realtimeSplit, JobConf jobConf) {
        if (HoodieRealtimeInputFormatUtils.canAddProjectionToJobConf(realtimeSplit, jobConf)) {
            JobConf jobConf2 = jobConf;
            synchronized (jobConf2) {
                LOG.info((Object)("Before adding Hoodie columns, Projections :" + jobConf.get("hive.io.file.readcolumn.names") + ", Ids :" + jobConf.get("hive.io.file.readcolumn.ids")));
                if (HoodieRealtimeInputFormatUtils.canAddProjectionToJobConf(realtimeSplit, jobConf)) {
                    if (!realtimeSplit.getDeltaLogPaths().isEmpty()) {
                        HoodieRealtimeInputFormatUtils.addRequiredProjectionFields((Configuration)jobConf, realtimeSplit.getHoodieVirtualKeyInfo());
                    }
                    this.conf = jobConf;
                    this.conf.set("hoodie.read.columns.set", "true");
                }
            }
        }
        HoodieRealtimeInputFormatUtils.cleanProjectionColumnIds((Configuration)jobConf);
    }

    @Override
    public RecordReader<NullWritable, ArrayWritable> getRecordReader(InputSplit split, JobConf jobConf, Reporter reporter) throws IOException {
        ValidationUtils.checkArgument((boolean)(split instanceof RealtimeSplit), (String)("HoodieRealtimeRecordReader can only work on RealtimeSplit and not with " + split));
        RealtimeSplit realtimeSplit = (RealtimeSplit)split;
        this.addProjectionToJobConf(realtimeSplit, jobConf);
        LOG.info((Object)("Creating record reader with readCols :" + jobConf.get("hive.io.file.readcolumn.names") + ", Ids :" + jobConf.get("hive.io.file.readcolumn.ids")));
        if (FSUtils.isLogFile((Path)realtimeSplit.getPath())) {
            return new HoodieRealtimeRecordReader(realtimeSplit, jobConf, new HoodieEmptyRecordReader(realtimeSplit, jobConf));
        }
        return new HoodieRealtimeRecordReader(realtimeSplit, jobConf, super.getRecordReader(split, jobConf, reporter));
    }

    @Override
    public Configuration getConf() {
        return this.conf;
    }
}

