package org.apache.flink.connector.base.source.reader;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.event.NoMoreSplitsEvent;
import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier;
import org.apache.flink.core.io.InputStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/connector/base/source/reader/SourceReaderBase.class */
public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT> implements SourceReader<T, SplitT> {
    private static final Logger LOG = LoggerFactory.getLogger(SourceReaderBase.class);
    private final FutureNotifier futureNotifier;
    private final BlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
    protected final RecordEmitter<E, T, SplitStateT> recordEmitter;
    protected final SplitFetcherManager<E, SplitT> splitFetcherManager;
    protected final SourceReaderOptions options;
    protected final Configuration config;
    protected SourceReaderContext context;
    private final Map<String, SplitContext<T, SplitStateT>> splitStates = new HashMap();
    private SplitsRecordIterator<E> splitIter = null;
    private boolean noMoreSplitsAssignment = false;

    /* loaded from: input_file:org/apache/flink/connector/base/source/reader/SourceReaderBase$SplitContext.class */
    private static final class SplitContext<T, SplitStateT> {
        final String splitId;
        final SplitStateT state;
        SourceOutput<T> sourceOutput;

        private SplitContext(String str, SplitStateT splitstatet) {
            this.state = splitstatet;
            this.splitId = str;
        }

        SourceOutput<T> getOrCreateSplitOutput(ReaderOutput<T> readerOutput) {
            if (this.sourceOutput == null) {
                this.sourceOutput = readerOutput.createOutputForSplit(this.splitId);
            }
            return this.sourceOutput;
        }
    }

    public SourceReaderBase(FutureNotifier futureNotifier, FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> futureCompletingBlockingQueue, SplitFetcherManager<E, SplitT> splitFetcherManager, RecordEmitter<E, T, SplitStateT> recordEmitter, Configuration configuration, SourceReaderContext sourceReaderContext) {
        this.futureNotifier = futureNotifier;
        this.elementsQueue = futureCompletingBlockingQueue;
        this.splitFetcherManager = splitFetcherManager;
        this.recordEmitter = recordEmitter;
        this.options = new SourceReaderOptions(configuration);
        this.config = configuration;
        this.context = sourceReaderContext;
    }

    public void start() {
    }

    public InputStatus pollNext(ReaderOutput<T> readerOutput) throws Exception {
        InputStatus inputStatus;
        this.splitFetcherManager.checkErrors();
        RecordsWithSplitIds<E> recordsWithSplitIds = null;
        boolean z = this.splitIter == null || !this.splitIter.hasNext();
        if (z) {
            recordsWithSplitIds = this.elementsQueue.poll();
        }
        if (z && recordsWithSplitIds == null) {
            inputStatus = finishedOrAvailableLater();
        } else {
            if (z) {
                this.splitIter = new SplitsRecordIterator<>(recordsWithSplitIds);
            }
            if (this.splitIter.hasNext()) {
                E next = this.splitIter.next();
                SplitContext<T, SplitStateT> splitContext = this.splitStates.get(this.splitIter.currentSplitId());
                this.recordEmitter.emitRecord(next, splitContext.getOrCreateSplitOutput(readerOutput), splitContext.state);
                LOG.trace("Emitted record: {}", next);
            }
            if (this.splitIter.hasNext()) {
                inputStatus = InputStatus.MORE_AVAILABLE;
            } else {
                this.splitIter.finishedSplitIds().forEach(str -> {
                    this.splitStates.remove(str);
                    readerOutput.releaseOutputForSplit(str);
                });
                onSplitFinished(this.splitIter.finishedSplitIds());
                inputStatus = finishedOrAvailableLater();
            }
        }
        LOG.trace("Source reader status: {}", inputStatus);
        return inputStatus;
    }

    public CompletableFuture<Void> isAvailable() {
        CompletableFuture<Void> future = this.futureNotifier.future();
        this.splitFetcherManager.checkErrors();
        if (!this.elementsQueue.isEmpty()) {
            this.futureNotifier.notifyComplete();
        }
        return future;
    }

    public List<SplitT> snapshotState() {
        ArrayList arrayList = new ArrayList();
        this.splitStates.forEach((str, splitContext) -> {
            arrayList.add(toSplitType(str, splitContext.state));
        });
        return arrayList;
    }

    public void addSplits(List<SplitT> list) {
        LOG.trace("Adding splits {}", list);
        list.forEach(sourceSplit -> {
            this.splitStates.put(sourceSplit.splitId(), new SplitContext<>(sourceSplit.splitId(), initializedState(sourceSplit)));
        });
        this.splitFetcherManager.addSplits(list);
    }

    public void handleSourceEvents(SourceEvent sourceEvent) {
        LOG.trace("Handling source event: {}", sourceEvent);
        if (sourceEvent instanceof NoMoreSplitsEvent) {
            this.noMoreSplitsAssignment = true;
            this.futureNotifier.notifyComplete();
        }
    }

    public void close() throws Exception {
        LOG.info("Closing Source Reader.");
        this.splitFetcherManager.close(this.options.sourceReaderCloseTimeout);
    }

    protected abstract void onSplitFinished(Collection<String> collection);

    protected abstract SplitStateT initializedState(SplitT splitt);

    protected abstract SplitT toSplitType(String str, SplitStateT splitstatet);

    private InputStatus finishedOrAvailableLater() {
        return (this.noMoreSplitsAssignment && this.splitFetcherManager.maybeShutdownFinishedFetchers() && (this.elementsQueue.isEmpty() && (this.splitIter == null || !this.splitIter.hasNext()))) ? InputStatus.END_OF_INPUT : InputStatus.NOTHING_AVAILABLE;
    }
}
