/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.mongodb.source.enumerator;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.TreeSet;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.connector.mongodb.source.enumerator.MongoSourceEnumState;
import org.apache.flink.connector.mongodb.source.enumerator.assigner.MongoSplitAssigner;
import org.apache.flink.connector.mongodb.source.split.MongoSourceSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class MongoSourceEnumerator
implements SplitEnumerator<MongoSourceSplit, MongoSourceEnumState> {
    private static final Logger LOG = LoggerFactory.getLogger(MongoSourceEnumerator.class);
    private final Boundedness boundedness;
    private final SplitEnumeratorContext<MongoSourceSplit> context;
    private final MongoSplitAssigner splitAssigner;
    private final TreeSet<Integer> readersAwaitingSplit;

    public MongoSourceEnumerator(Boundedness boundedness, SplitEnumeratorContext<MongoSourceSplit> context, MongoSplitAssigner splitAssigner) {
        this.boundedness = boundedness;
        this.context = context;
        this.splitAssigner = splitAssigner;
        this.readersAwaitingSplit = new TreeSet();
    }

    public void start() {
        this.splitAssigner.open();
    }

    public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
        if (!this.context.registeredReaders().containsKey(subtaskId)) {
            return;
        }
        this.readersAwaitingSplit.add(subtaskId);
        this.assignSplits();
    }

    public void addSplitsBack(List<MongoSourceSplit> splits, int subtaskId) {
        LOG.debug("Mongo Source Enumerator adds splits back: {}", splits);
        this.splitAssigner.addSplitsBack(splits);
    }

    public void addReader(int subtaskId) {
        LOG.debug("Adding reader {} to MongoSourceEnumerator.", (Object)subtaskId);
    }

    private void assignSplits() {
        Iterator<Integer> awaitingReader = this.readersAwaitingSplit.iterator();
        while (awaitingReader.hasNext()) {
            int nextAwaiting = awaitingReader.next();
            if (!this.context.registeredReaders().containsKey(nextAwaiting)) {
                awaitingReader.remove();
                continue;
            }
            if (this.splitAssigner.noMoreSplits() && this.boundedness == Boundedness.BOUNDED) {
                this.context.signalNoMoreSplits(nextAwaiting);
                awaitingReader.remove();
                LOG.info("All scan splits have been assigned, closing idle reader {}", (Object)nextAwaiting);
                continue;
            }
            Optional<MongoSourceSplit> split = this.splitAssigner.getNext();
            if (!split.isPresent()) break;
            MongoSourceSplit mongoSplit = split.get();
            this.context.assignSplit((SourceSplit)mongoSplit, nextAwaiting);
            awaitingReader.remove();
            LOG.info("Assign split {} to subtask {}", (Object)mongoSplit, (Object)nextAwaiting);
            break;
        }
    }

    public MongoSourceEnumState snapshotState(long checkpointId) {
        return this.splitAssigner.snapshotState(checkpointId);
    }

    public void close() throws IOException {
        this.splitAssigner.close();
    }
}

