/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.table.source.snapshot;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.operation.ScanKind;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.source.snapshot.StartingScanner;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;

public class IncrementalStartingScanner
implements StartingScanner {
    private long start;
    private long end;

    public IncrementalStartingScanner(long start, long end) {
        this.start = start;
        this.end = end;
    }

    @Override
    public StartingScanner.Result scan(SnapshotManager manager, SnapshotReader reader) {
        long earliestSnapshotId = manager.earliestSnapshotId();
        long latestSnapshotId = manager.latestSnapshotId();
        this.start = this.start < earliestSnapshotId ? earliestSnapshotId - 1L : this.start;
        this.end = this.end > latestSnapshotId ? latestSnapshotId : this.end;
        HashMap<Pair, List> grouped = new HashMap<Pair, List>();
        for (long i = this.start + 1L; i < this.end + 1L; ++i) {
            List<DataSplit> splits = this.readDeltaSplits(reader, manager.snapshot(i));
            for (DataSplit split : splits) {
                grouped.computeIfAbsent(Pair.of(split.partition(), split.bucket()), k -> new ArrayList()).addAll(split.dataFiles());
            }
        }
        ArrayList<DataSplit> result = new ArrayList<DataSplit>();
        for (Map.Entry entry : grouped.entrySet()) {
            BinaryRow partition = (BinaryRow)((Pair)entry.getKey()).getLeft();
            int bucket = (Integer)((Pair)entry.getKey()).getRight();
            for (List<DataFileMeta> files : reader.splitGenerator().splitForBatch((List)entry.getValue())) {
                result.add(DataSplit.builder().withSnapshot(this.end).withPartition(partition).withBucket(bucket).withDataFiles(files).build());
            }
        }
        return new StartingScanner.ScannedResult(this.end, null, result);
    }

    private List<DataSplit> readDeltaSplits(SnapshotReader reader, Snapshot s) {
        if (s.commitKind() != Snapshot.CommitKind.APPEND) {
            return Collections.emptyList();
        }
        return reader.withSnapshot(s).withKind(ScanKind.DELTA).read().splits();
    }
}

