package org.apache.seatunnel.connectors.seatunnel.common.source.reader;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceEvent;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcherManager;
import org.apache.seatunnel.shade.google.sheets.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderBase.class */
public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT> implements SourceReader<T, SplitT> {
    private static final Logger log = LoggerFactory.getLogger(SourceReaderBase.class);
    private final BlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
    private final ConcurrentMap<String, SplitContext<T, SplitStateT>> splitStates = new ConcurrentHashMap();
    protected final RecordEmitter<E, T, SplitStateT> recordEmitter;
    protected final SplitFetcherManager<E, SplitT> splitFetcherManager;
    protected final SourceReaderOptions options;
    protected final SourceReader.Context context;
    private RecordsWithSplitIds<E> currentFetch;
    private SplitContext<T, SplitStateT> currentSplitContext;
    private Collector<T> currentSplitOutput;
    private boolean noMoreSplitsAssignment;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderBase$SplitContext.class */
    public static final class SplitContext<T, SplitStateT> {
        final String splitId;
        final SplitStateT state;
        Collector<T> splitOutput;

        Collector<T> getOrCreateSplitOutput(Collector<T> collector) {
            if (this.splitOutput == null) {
                this.splitOutput = collector;
            }
            return this.splitOutput;
        }

        public SplitContext(String str, SplitStateT splitstatet) {
            this.splitId = str;
            this.state = splitstatet;
        }
    }

    public SourceReaderBase(BlockingQueue<RecordsWithSplitIds<E>> blockingQueue, SplitFetcherManager<E, SplitT> splitFetcherManager, RecordEmitter<E, T, SplitStateT> recordEmitter, SourceReaderOptions sourceReaderOptions, SourceReader.Context context) {
        this.elementsQueue = blockingQueue;
        this.splitFetcherManager = splitFetcherManager;
        this.recordEmitter = recordEmitter;
        this.options = sourceReaderOptions;
        this.context = context;
    }

    public void open() {
        log.info("Open Source Reader.");
    }

    public void pollNext(Collector<T> collector) throws Exception {
        RecordsWithSplitIds<E> recordsWithSplitIds = this.currentFetch;
        if (recordsWithSplitIds == null) {
            recordsWithSplitIds = getNextFetch(collector);
            if (recordsWithSplitIds == null) {
                if (Boundedness.BOUNDED.equals(this.context.getBoundedness()) && this.noMoreSplitsAssignment && this.splitFetcherManager.maybeShutdownFinishedFetchers() && this.elementsQueue.isEmpty()) {
                    this.context.signalNoMoreElement();
                    log.info("Send NoMoreElement event");
                    return;
                }
                return;
            }
        }
        E nextRecordFromSplit = recordsWithSplitIds.nextRecordFromSplit();
        if (nextRecordFromSplit == null) {
            if (moveToNextSplit(recordsWithSplitIds, collector)) {
                return;
            }
            pollNext(collector);
        } else {
            synchronized (collector.getCheckpointLock()) {
                this.recordEmitter.emitRecord(nextRecordFromSplit, this.currentSplitOutput, this.currentSplitContext.state);
            }
            log.trace("Emitted record: {}", nextRecordFromSplit);
        }
    }

    public List<SplitT> snapshotState(long j) {
        ArrayList arrayList = new ArrayList();
        this.splitStates.forEach((str, splitContext) -> {
            arrayList.add(toSplitType(str, splitContext.state));
        });
        log.debug("Snapshot state from splits: {}", arrayList);
        return arrayList;
    }

    public void addSplits(List<SplitT> list) {
        log.debug("Adding split(s) to reader: {}", list);
        list.forEach(sourceSplit -> {
            this.splitStates.put(sourceSplit.splitId(), new SplitContext<>(sourceSplit.splitId(), initializedState(sourceSplit)));
        });
        this.splitFetcherManager.addSplits(list);
    }

    public void handleNoMoreSplits() {
        log.info("Reader received NoMoreSplits event.");
        this.noMoreSplitsAssignment = true;
    }

    public void handleSourceEvent(SourceEvent sourceEvent) {
        log.info("Received unhandled source event: {}", sourceEvent);
    }

    public void close() {
        log.info("Closing Source Reader.");
        try {
            this.splitFetcherManager.close(this.options.getSourceReaderCloseTimeout());
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private RecordsWithSplitIds<E> getNextFetch(Collector<T> collector) {
        this.splitFetcherManager.checkErrors();
        RecordsWithSplitIds<E> poll = this.elementsQueue.poll();
        if (poll != null && moveToNextSplit(poll, collector)) {
            this.currentFetch = poll;
            return poll;
        }
        try {
            log.trace("Current fetch is finished.");
            Thread.sleep(100L);
            return null;
        } catch (InterruptedException e) {
            throw new SeaTunnelException(e);
        }
    }

    private boolean moveToNextSplit(RecordsWithSplitIds<E> recordsWithSplitIds, Collector<T> collector) {
        String nextSplit = recordsWithSplitIds.nextSplit();
        if (nextSplit == null) {
            log.trace("Current fetch is finished.");
            finishCurrentFetch(recordsWithSplitIds, collector);
            return false;
        }
        this.currentSplitContext = this.splitStates.get(nextSplit);
        Preconditions.checkState(this.currentSplitContext != null, "Have records for a split that was not registered");
        this.currentSplitOutput = this.currentSplitContext.getOrCreateSplitOutput(collector);
        log.trace("Emitting records from fetch for split {}", nextSplit);
        return true;
    }

    private void finishCurrentFetch(RecordsWithSplitIds<E> recordsWithSplitIds, Collector<T> collector) {
        this.currentFetch = null;
        this.currentSplitContext = null;
        this.currentSplitOutput = null;
        Set<String> finishedSplits = recordsWithSplitIds.finishedSplits();
        if (!finishedSplits.isEmpty()) {
            log.info("Finished reading split(s) {}", finishedSplits);
            HashMap hashMap = new HashMap();
            for (String str : finishedSplits) {
                hashMap.put(str, this.splitStates.remove(str).state);
            }
            onSplitFinished(hashMap);
        }
        recordsWithSplitIds.recycle();
    }

    public int getNumberOfCurrentlyAssignedSplits() {
        return this.splitStates.size();
    }

    protected abstract void onSplitFinished(Map<String, SplitStateT> map);

    protected abstract SplitStateT initializedState(SplitT splitt);

    protected abstract SplitT toSplitType(String str, SplitStateT splitstatet);
}
