package org.apache.hudi.common.table.cdc;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;

/* loaded from: input_file:org/apache/hudi/common/table/cdc/HoodieCDCExtractor.class */
public class HoodieCDCExtractor {
    private final HoodieTableMetaClient metaClient;
    private final Path basePath;
    private final FileSystem fs;
    private final HoodieCDCSupplementalLoggingMode supplementalLoggingMode;
    private final InstantRange instantRange;
    private Map<HoodieInstant, HoodieCommitMetadata> commits;
    private HoodieTableFileSystemView fsView;

    public HoodieCDCExtractor(HoodieTableMetaClient hoodieTableMetaClient, InstantRange instantRange) {
        this.metaClient = hoodieTableMetaClient;
        this.basePath = hoodieTableMetaClient.getBasePathV2();
        this.fs = hoodieTableMetaClient.getFs().getFileSystem();
        this.supplementalLoggingMode = hoodieTableMetaClient.getTableConfig().cdcSupplementalLoggingMode();
        this.instantRange = instantRange;
        init();
    }

    private void init() {
        initInstantAndCommitMetadata();
    }

    public Map<HoodieFileGroupId, List<HoodieCDCFileSplit>> extractCDCFileSplits() {
        ValidationUtils.checkState(this.commits != null, "Empty commits");
        HashMap hashMap = new HashMap();
        for (HoodieInstant hoodieInstant : this.commits.keySet()) {
            HoodieCommitMetadata hoodieCommitMetadata = this.commits.get(hoodieInstant);
            Map<String, List<HoodieWriteStat>> partitionToWriteStats = hoodieCommitMetadata.getPartitionToWriteStats();
            for (String str : partitionToWriteStats.keySet()) {
                partitionToWriteStats.get(str).forEach(hoodieWriteStat -> {
                    HoodieFileGroupId hoodieFileGroupId = new HoodieFileGroupId(str, hoodieWriteStat.getFileId());
                    HoodieCDCFileSplit parseWriteStat = parseWriteStat(hoodieFileGroupId, hoodieInstant, hoodieWriteStat, hoodieCommitMetadata.getOperationType());
                    hashMap.computeIfAbsent(hoodieFileGroupId, hoodieFileGroupId2 -> {
                        return new ArrayList();
                    });
                    ((List) hashMap.get(hoodieFileGroupId)).add(parseWriteStat);
                });
            }
            if (hoodieCommitMetadata instanceof HoodieReplaceCommitMetadata) {
                Map<String, List<String>> partitionToReplaceFileIds = ((HoodieReplaceCommitMetadata) hoodieCommitMetadata).getPartitionToReplaceFileIds();
                for (String str2 : partitionToReplaceFileIds.keySet()) {
                    partitionToReplaceFileIds.get(str2).forEach(str3 -> {
                        Option<FileSlice> fetchLatestFileSlice = getOrCreateFsView().fetchLatestFileSlice(str2, str3);
                        if (fetchLatestFileSlice.isPresent()) {
                            HoodieFileGroupId hoodieFileGroupId = new HoodieFileGroupId(str2, str3);
                            HoodieCDCFileSplit hoodieCDCFileSplit = new HoodieCDCFileSplit(hoodieInstant.getTimestamp(), HoodieCDCInferenceCase.REPLACE_COMMIT, new ArrayList(), fetchLatestFileSlice, (Option<FileSlice>) Option.empty());
                            if (!hashMap.containsKey(hoodieFileGroupId)) {
                                hashMap.put(hoodieFileGroupId, new ArrayList());
                            }
                            ((List) hashMap.get(hoodieFileGroupId)).add(hoodieCDCFileSplit);
                        }
                    });
                }
            }
        }
        return hashMap;
    }

    private HoodieTableFileSystemView getOrCreateFsView() {
        if (this.fsView == null) {
            this.fsView = initFSView();
        }
        return this.fsView;
    }

    private HoodieTableFileSystemView initFSView() {
        HashSet hashSet = new HashSet();
        Iterator<Map.Entry<HoodieInstant, HoodieCommitMetadata>> it = this.commits.entrySet().iterator();
        while (it.hasNext()) {
            HoodieCommitMetadata value = it.next().getValue();
            hashSet.addAll(value.getPartitionToWriteStats().keySet());
            if (value instanceof HoodieReplaceCommitMetadata) {
                hashSet.addAll(((HoodieReplaceCommitMetadata) value).getPartitionToReplaceFileIds().keySet());
            }
        }
        try {
            ArrayList arrayList = new ArrayList();
            Iterator it2 = hashSet.iterator();
            while (it2.hasNext()) {
                arrayList.addAll(Arrays.asList(this.fs.listStatus(FSUtils.getPartitionPath(this.basePath, (String) it2.next()))));
            }
            return new HoodieTableFileSystemView(this.metaClient, this.metaClient.getCommitsTimeline().filterCompletedInstants(), (FileStatus[]) arrayList.toArray(new FileStatus[0]));
        } catch (Exception e) {
            throw new HoodieException("Fail to init FileSystem View for CDC", e);
        }
    }

    private void initInstantAndCommitMetadata() {
        try {
            HashSet hashSet = new HashSet(Arrays.asList(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION));
            HoodieActiveTimeline activeTimeline = this.metaClient.getActiveTimeline();
            this.commits = (Map) activeTimeline.getInstantsAsStream().filter(hoodieInstant -> {
                return hoodieInstant.isCompleted() && this.instantRange.isInRange(hoodieInstant.getTimestamp()) && hashSet.contains(hoodieInstant.getAction().toLowerCase(Locale.ROOT));
            }).map(hoodieInstant2 -> {
                try {
                    return Pair.of(hoodieInstant2, TimelineUtils.getCommitMetadata(hoodieInstant2, activeTimeline));
                } catch (IOException e) {
                    throw new HoodieIOException(e.getMessage());
                }
            }).filter(pair -> {
                return WriteOperationType.isDataChange(((HoodieCommitMetadata) pair.getRight()).getOperationType());
            }).collect(Collectors.toMap((v0) -> {
                return v0.getLeft();
            }, (v0) -> {
                return v0.getRight();
            }));
        } catch (Exception e) {
            throw new HoodieIOException("Fail to get the commit metadata for CDC");
        }
    }

    private HoodieCDCFileSplit parseWriteStat(HoodieFileGroupId hoodieFileGroupId, HoodieInstant hoodieInstant, HoodieWriteStat hoodieWriteStat, WriteOperationType writeOperationType) {
        HoodieCDCFileSplit hoodieCDCFileSplit;
        Path basePathV2 = this.metaClient.getBasePathV2();
        FileSystem fileSystem = this.metaClient.getFs().getFileSystem();
        String timestamp = hoodieInstant.getTimestamp();
        if (CollectionUtils.isNullOrEmpty(hoodieWriteStat.getCdcStats())) {
            String path = hoodieWriteStat.getPath();
            if (!FSUtils.isBaseFile(new Path(path))) {
                hoodieCDCFileSplit = new HoodieCDCFileSplit(timestamp, HoodieCDCInferenceCase.LOG_FILE, path, getDependentFileSliceForLogFile(hoodieFileGroupId, hoodieInstant, path), (Option<FileSlice>) Option.empty());
            } else if (WriteOperationType.isDelete(writeOperationType) && hoodieWriteStat.getNumWrites() == 0 && hoodieWriteStat.getNumDeletes() != 0) {
                hoodieCDCFileSplit = new HoodieCDCFileSplit(timestamp, HoodieCDCInferenceCase.BASE_FILE_DELETE, new ArrayList(), (Option<FileSlice>) Option.of(new FileSlice(hoodieFileGroupId, hoodieWriteStat.getPrevCommit(), getOrCreateFsView().getBaseFileOn(hoodieFileGroupId.getPartitionPath(), hoodieWriteStat.getPrevCommit(), hoodieFileGroupId.getFileId()).orElseThrow(() -> {
                    return new HoodieIOException("Can not get the previous version of the base file");
                }), Collections.emptyList())), (Option<FileSlice>) Option.empty());
            } else {
                if (hoodieWriteStat.getNumUpdateWrites() != 0 || hoodieWriteStat.getNumWrites() != hoodieWriteStat.getNumInserts()) {
                    throw new HoodieException("There should be a cdc log file.");
                }
                hoodieCDCFileSplit = new HoodieCDCFileSplit(timestamp, HoodieCDCInferenceCase.BASE_FILE_INSERT, path);
            }
        } else if (this.supplementalLoggingMode == HoodieCDCSupplementalLoggingMode.DATA_BEFORE_AFTER) {
            hoodieCDCFileSplit = new HoodieCDCFileSplit(timestamp, HoodieCDCInferenceCase.AS_IS, hoodieWriteStat.getCdcStats().keySet());
        } else {
            try {
                HoodieBaseFile orElseThrow = getOrCreateFsView().getBaseFileOn(hoodieFileGroupId.getPartitionPath(), hoodieWriteStat.getPrevCommit(), hoodieFileGroupId.getFileId()).orElseThrow(() -> {
                    return new HoodieIOException("Can not get the previous version of the base file");
                });
                FileSlice fileSlice = null;
                FileSlice fileSlice2 = new FileSlice(hoodieFileGroupId, hoodieInstant.getTimestamp(), new HoodieBaseFile(fileSystem.getFileStatus(new Path(basePathV2, hoodieWriteStat.getPath()))), new ArrayList());
                if (this.supplementalLoggingMode == HoodieCDCSupplementalLoggingMode.OP_KEY_ONLY) {
                    fileSlice = new FileSlice(hoodieFileGroupId, hoodieWriteStat.getPrevCommit(), orElseThrow, new ArrayList());
                }
                hoodieCDCFileSplit = new HoodieCDCFileSplit(timestamp, HoodieCDCInferenceCase.AS_IS, hoodieWriteStat.getCdcStats().keySet(), (Option<FileSlice>) Option.ofNullable(fileSlice), (Option<FileSlice>) Option.ofNullable(fileSlice2));
            } catch (Exception e) {
                throw new HoodieException("Fail to parse HoodieWriteStat", e);
            }
        }
        return hoodieCDCFileSplit;
    }

    private Option<FileSlice> getDependentFileSliceForLogFile(HoodieFileGroupId hoodieFileGroupId, HoodieInstant hoodieInstant, String str) {
        Path partitionPath = FSUtils.getPartitionPath(this.basePath, hoodieFileGroupId.getPartitionPath());
        if (!hoodieInstant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)) {
            return Option.empty();
        }
        String name = new Path(str).getName();
        Option<Pair<String, List<String>>> fileSliceForFileGroupFromDeltaCommit = HoodieCommitMetadata.getFileSliceForFileGroupFromDeltaCommit(this.metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(), hoodieFileGroupId);
        if (!fileSliceForFileGroupFromDeltaCommit.isPresent()) {
            return Option.empty();
        }
        Pair<String, List<String>> pair = fileSliceForFileGroupFromDeltaCommit.get();
        try {
            return Option.of(new FileSlice(hoodieFileGroupId, hoodieInstant.getTimestamp(), new HoodieBaseFile(this.fs.getFileStatus(new Path(partitionPath, pair.getLeft()))), (List) Arrays.stream(this.fs.listStatus((Path[]) pair.getRight().stream().filter(str2 -> {
                return !str2.equals(name);
            }).map(str3 -> {
                return new Path(partitionPath, str3);
            }).toArray(i -> {
                return new Path[i];
            }))).map(HoodieLogFile::new).collect(Collectors.toList())));
        } catch (Exception e) {
            throw new HoodieException("Fail to get the dependent file slice for a log file", e);
        }
    }
}
