/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source.assigners;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.paimon.flink.source.FileStoreSourceSplit;
import org.apache.paimon.flink.source.align.PlaceholderSplit;
import org.apache.paimon.flink.source.assigners.SplitAssigner;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.utils.Preconditions;

public class AlignedSplitAssigner
implements SplitAssigner {
    private final Deque<PendingSnapshot> pendingSplitAssignment = new LinkedList<PendingSnapshot>();

    @Override
    public List<FileStoreSourceSplit> getNext(int subtask, @Nullable String hostname) {
        PendingSnapshot head = this.pendingSplitAssignment.peek();
        if (head != null && !head.isPlaceHolder) {
            List<FileStoreSourceSplit> subtaskSplits = head.remove(subtask);
            return subtaskSplits != null ? subtaskSplits : Collections.emptyList();
        }
        return Collections.emptyList();
    }

    @Override
    public void addSplit(int subtask, FileStoreSourceSplit splits) {
        long snapshotId = ((DataSplit)splits.split()).snapshotId();
        PendingSnapshot last = this.pendingSplitAssignment.peekLast();
        boolean isPlaceholder = splits.split() instanceof PlaceholderSplit;
        if (last == null || last.snapshotId != snapshotId) {
            last = new PendingSnapshot(snapshotId, isPlaceholder, new HashMap<Integer, List<FileStoreSourceSplit>>());
            last.add(subtask, splits);
            this.pendingSplitAssignment.addLast(last);
        } else {
            last.add(subtask, splits);
        }
    }

    @Override
    public void addSplitsBack(int suggestedTask, List<FileStoreSourceSplit> splits) {
        if (splits.isEmpty()) {
            return;
        }
        long snapshotId = ((DataSplit)splits.get(0).split()).snapshotId();
        boolean isPlaceholder = splits.get(0).split() instanceof PlaceholderSplit;
        PendingSnapshot head = this.pendingSplitAssignment.peek();
        if (head == null || snapshotId != head.snapshotId) {
            head = new PendingSnapshot(snapshotId, isPlaceholder, new HashMap<Integer, List<FileStoreSourceSplit>>());
            head.addAll(suggestedTask, splits);
            this.pendingSplitAssignment.addFirst(head);
        } else {
            head.addAll(suggestedTask, splits);
        }
    }

    @Override
    public Collection<FileStoreSourceSplit> remainingSplits() {
        ArrayList<FileStoreSourceSplit> remainingSplits = new ArrayList<FileStoreSourceSplit>();
        for (PendingSnapshot pendingSnapshot : this.pendingSplitAssignment) {
            pendingSnapshot.subtaskSplits.values().forEach(remainingSplits::addAll);
        }
        return remainingSplits;
    }

    public boolean isAligned() {
        PendingSnapshot head = this.pendingSplitAssignment.peek();
        return head != null && head.empty();
    }

    public int remainingSnapshots() {
        return this.pendingSplitAssignment.size();
    }

    public void removeFirst() {
        PendingSnapshot head = this.pendingSplitAssignment.poll();
        Preconditions.checkArgument(head != null && head.empty(), "The head pending splits is not empty. This is a bug, please file an issue.");
    }

    public Long minRemainingSnapshotId() {
        PendingSnapshot head = this.pendingSplitAssignment.peek();
        return head != null ? Long.valueOf(head.snapshotId) : null;
    }

    private static class PendingSnapshot {
        private final long snapshotId;
        private final boolean isPlaceHolder;
        private final Map<Integer, List<FileStoreSourceSplit>> subtaskSplits;

        public PendingSnapshot(long snapshotId, boolean isPlaceHolder, Map<Integer, List<FileStoreSourceSplit>> subtaskSplits) {
            this.snapshotId = snapshotId;
            this.isPlaceHolder = isPlaceHolder;
            this.subtaskSplits = subtaskSplits;
        }

        public List<FileStoreSourceSplit> remove(int subtask) {
            return this.subtaskSplits.remove(subtask);
        }

        public void add(int subtask, FileStoreSourceSplit split) {
            Preconditions.checkArgument(((DataSplit)split.split()).snapshotId() == this.snapshotId, "SnapshotId not equal. This is a bug, please file an issue.");
            this.subtaskSplits.computeIfAbsent(subtask, id -> new ArrayList()).add(split);
        }

        public void addAll(int subtask, List<FileStoreSourceSplit> splits) {
            Preconditions.checkArgument(!this.subtaskSplits.containsKey(subtask), "Encountered a non-empty list of subtask pending splits. This is a bug, please file an issue.");
            splits.forEach(split -> Preconditions.checkArgument(((DataSplit)split.split()).snapshotId() == this.snapshotId, "SnapshotId not equal"));
            this.subtaskSplits.put(subtask, splits);
        }

        public boolean empty() {
            return this.subtaskSplits.isEmpty() || this.isPlaceHolder;
        }
    }
}

