package org.apache.flink.connector.base.source.reader;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.metrics.Counter;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
/* loaded from: input_file:org/apache/flink/connector/base/source/reader/SourceReaderBase.class */
public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT> implements SourceReader<T, SplitT> {
    private static final Logger LOG = LoggerFactory.getLogger(SourceReaderBase.class);
    private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
    private final Map<String, SplitContext<T, SplitStateT>> splitStates;
    protected final RecordEmitter<E, T, SplitStateT> recordEmitter;
    protected final SplitFetcherManager<E, SplitT> splitFetcherManager;
    protected final SourceReaderOptions options;
    protected final Configuration config;
    private final Counter numRecordsInCounter;
    protected SourceReaderContext context;

    @Nullable
    private RecordsWithSplitIds<E> currentFetch;

    @Nullable
    private SplitContext<T, SplitStateT> currentSplitContext;

    @Nullable
    private SourceOutput<T> currentSplitOutput;
    private boolean noMoreSplitsAssignment;

    @Nullable
    protected final RecordEvaluator<T> eofRecordEvaluator;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/base/source/reader/SourceReaderBase$SourceOutputWrapper.class */
    public static final class SourceOutputWrapper<T> implements SourceOutput<T> {
        final SourceOutput<T> sourceOutput;
        final Function<T, Boolean> eofRecordHandler;
        private boolean isStreamEnd = false;

        public SourceOutputWrapper(SourceOutput<T> sourceOutput, Function<T, Boolean> function) {
            this.sourceOutput = sourceOutput;
            this.eofRecordHandler = function;
        }

        public void emitWatermark(Watermark watermark) {
            this.sourceOutput.emitWatermark(watermark);
        }

        public void markIdle() {
            this.sourceOutput.markIdle();
        }

        public void markActive() {
            this.sourceOutput.markActive();
        }

        public void collect(T t) {
            if (isEndOfStreamReached(t)) {
                return;
            }
            this.sourceOutput.collect(t);
        }

        public void collect(T t, long j) {
            if (isEndOfStreamReached(t)) {
                return;
            }
            this.sourceOutput.collect(t, j);
        }

        private boolean isEndOfStreamReached(T t) {
            if (this.isStreamEnd) {
                return true;
            }
            if (this.eofRecordHandler.apply(t).booleanValue()) {
                this.isStreamEnd = true;
            }
            return this.isStreamEnd;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/base/source/reader/SourceReaderBase$SplitContext.class */
    public static final class SplitContext<T, SplitStateT> {
        final String splitId;
        final SplitStateT state;

        @Nullable
        SourceOutput<T> sourceOutput;

        private SplitContext(String str, SplitStateT splitstatet) {
            this.state = splitstatet;
            this.splitId = str;
        }

        SourceOutput<T> getOrCreateSplitOutput(ReaderOutput<T> readerOutput, @Nullable Function<T, Boolean> function) {
            if (this.sourceOutput == null) {
                this.sourceOutput = readerOutput.createOutputForSplit(this.splitId);
                if (function != null) {
                    this.sourceOutput = new SourceOutputWrapper(this.sourceOutput, function);
                }
            }
            return this.sourceOutput;
        }
    }

    public SourceReaderBase(SplitFetcherManager<E, SplitT> splitFetcherManager, RecordEmitter<E, T, SplitStateT> recordEmitter, Configuration configuration, SourceReaderContext sourceReaderContext) {
        this(splitFetcherManager, recordEmitter, null, configuration, sourceReaderContext);
    }

    public SourceReaderBase(SplitFetcherManager<E, SplitT> splitFetcherManager, RecordEmitter<E, T, SplitStateT> recordEmitter, @Nullable RecordEvaluator<T> recordEvaluator, Configuration configuration, SourceReaderContext sourceReaderContext) {
        this.elementsQueue = splitFetcherManager.getQueue();
        this.splitFetcherManager = splitFetcherManager;
        this.recordEmitter = recordEmitter;
        this.splitStates = new HashMap();
        this.options = new SourceReaderOptions(configuration);
        this.config = configuration;
        this.context = sourceReaderContext;
        this.noMoreSplitsAssignment = false;
        this.eofRecordEvaluator = recordEvaluator;
        this.numRecordsInCounter = sourceReaderContext.metricGroup().getIOMetricGroup().getNumRecordsInCounter();
    }

    public void start() {
    }

    public InputStatus pollNext(ReaderOutput<T> readerOutput) throws Exception {
        RecordsWithSplitIds<E> recordsWithSplitIds = this.currentFetch;
        if (recordsWithSplitIds == null) {
            recordsWithSplitIds = getNextFetch(readerOutput);
            if (recordsWithSplitIds == null) {
                return trace(finishedOrAvailableLater());
            }
        }
        do {
            E nextRecordFromSplit = recordsWithSplitIds.nextRecordFromSplit();
            if (nextRecordFromSplit != null) {
                this.numRecordsInCounter.inc(1L);
                this.recordEmitter.emitRecord(nextRecordFromSplit, this.currentSplitOutput, this.currentSplitContext.state);
                LOG.trace("Emitted record: {}", nextRecordFromSplit);
                return trace(InputStatus.MORE_AVAILABLE);
            }
        } while (moveToNextSplit(recordsWithSplitIds, readerOutput));
        return pollNext(readerOutput);
    }

    private InputStatus trace(InputStatus inputStatus) {
        LOG.trace("Source reader status: {}", inputStatus);
        return inputStatus;
    }

    @Nullable
    private RecordsWithSplitIds<E> getNextFetch(ReaderOutput<T> readerOutput) {
        this.splitFetcherManager.checkErrors();
        LOG.trace("Getting next source data batch from queue");
        RecordsWithSplitIds<E> poll = this.elementsQueue.poll();
        if (poll == null || !moveToNextSplit(poll, readerOutput)) {
            return null;
        }
        this.currentFetch = poll;
        return poll;
    }

    private void finishCurrentFetch(RecordsWithSplitIds<E> recordsWithSplitIds, ReaderOutput<T> readerOutput) {
        this.currentFetch = null;
        this.currentSplitContext = null;
        this.currentSplitOutput = null;
        Set<String> finishedSplits = recordsWithSplitIds.finishedSplits();
        if (!finishedSplits.isEmpty()) {
            LOG.info("Finished reading split(s) {}", finishedSplits);
            HashMap hashMap = new HashMap();
            for (String str : finishedSplits) {
                hashMap.put(str, this.splitStates.remove(str).state);
                readerOutput.releaseOutputForSplit(str);
            }
            onSplitFinished(hashMap);
        }
        recordsWithSplitIds.recycle();
    }

    private boolean moveToNextSplit(RecordsWithSplitIds<E> recordsWithSplitIds, ReaderOutput<T> readerOutput) {
        String nextSplit = recordsWithSplitIds.nextSplit();
        if (nextSplit == null) {
            LOG.trace("Current fetch is finished.");
            finishCurrentFetch(recordsWithSplitIds, readerOutput);
            return false;
        }
        this.currentSplitContext = this.splitStates.get(nextSplit);
        Preconditions.checkState(this.currentSplitContext != null, "Have records for a split that was not registered");
        Function<T, Boolean> function = null;
        if (this.eofRecordEvaluator != null) {
            function = obj -> {
                if (!this.eofRecordEvaluator.isEndOfStream(obj)) {
                    return false;
                }
                this.splitFetcherManager.removeSplits(Collections.singletonList(toSplitType(this.currentSplitContext.splitId, this.currentSplitContext.state)));
                return true;
            };
        }
        this.currentSplitOutput = this.currentSplitContext.getOrCreateSplitOutput(readerOutput, function);
        LOG.trace("Emitting records from fetch for split {}", nextSplit);
        return true;
    }

    public CompletableFuture<Void> isAvailable() {
        return this.currentFetch != null ? FutureCompletingBlockingQueue.AVAILABLE : this.elementsQueue.getAvailabilityFuture();
    }

    public List<SplitT> snapshotState(long j) {
        ArrayList arrayList = new ArrayList();
        this.splitStates.forEach((str, splitContext) -> {
            arrayList.add(toSplitType(str, splitContext.state));
        });
        return arrayList;
    }

    public void addSplits(List<SplitT> list) {
        LOG.info("Adding split(s) to reader: {}", list);
        list.forEach(sourceSplit -> {
            this.splitStates.put(sourceSplit.splitId(), new SplitContext<>(sourceSplit.splitId(), initializedState(sourceSplit)));
        });
        this.splitFetcherManager.addSplits(list);
    }

    public void notifyNoMoreSplits() {
        LOG.info("Reader received NoMoreSplits event.");
        this.noMoreSplitsAssignment = true;
        this.elementsQueue.notifyAvailable();
    }

    public void handleSourceEvents(SourceEvent sourceEvent) {
        LOG.info("Received unhandled source event: {}", sourceEvent);
    }

    public void pauseOrResumeSplits(Collection<String> collection, Collection<String> collection2) {
        this.splitFetcherManager.pauseOrResumeSplits(collection, collection2);
    }

    public void close() throws Exception {
        LOG.info("Closing Source Reader.");
        this.splitFetcherManager.close(this.options.sourceReaderCloseTimeout);
    }

    public int getNumberOfCurrentlyAssignedSplits() {
        return this.splitStates.size();
    }

    protected abstract void onSplitFinished(Map<String, SplitStateT> map);

    protected abstract SplitStateT initializedState(SplitT splitt);

    protected abstract SplitT toSplitType(String str, SplitStateT splitstatet);

    private InputStatus finishedOrAvailableLater() {
        boolean maybeShutdownFinishedFetchers = this.splitFetcherManager.maybeShutdownFinishedFetchers();
        if (!this.noMoreSplitsAssignment || !maybeShutdownFinishedFetchers) {
            return InputStatus.NOTHING_AVAILABLE;
        }
        if (!this.elementsQueue.isEmpty()) {
            return InputStatus.MORE_AVAILABLE;
        }
        this.splitFetcherManager.checkErrors();
        return InputStatus.END_OF_INPUT;
    }
}
