package org.apache.flink.connector.pulsar.source.reader.source;

import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.reader.fetcher.PulsarOrderedFetcherManager;
import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage;
import org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState;
import org.apache.flink.core.io.InputStatus;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReader.class */
public class PulsarOrderedSourceReader<OUT> extends PulsarSourceReaderBase<OUT> {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarOrderedSourceReader.class);

    @VisibleForTesting
    final SortedMap<Long, Map<TopicPartition, MessageId>> cursorsToCommit;
    private final ConcurrentMap<TopicPartition, MessageId> cursorsOfFinishedSplits;
    private final AtomicReference<Throwable> cursorCommitThrowable;
    private ScheduledExecutorService cursorScheduler;

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public PulsarOrderedSourceReader(FutureCompletingBlockingQueue<RecordsWithSplitIds<PulsarMessage<OUT>>> futureCompletingBlockingQueue, Supplier<PulsarOrderedPartitionSplitReader<OUT>> supplier, SourceReaderContext sourceReaderContext, SourceConfiguration sourceConfiguration, PulsarClient pulsarClient, PulsarAdmin pulsarAdmin) {
        super(futureCompletingBlockingQueue, new PulsarOrderedFetcherManager(futureCompletingBlockingQueue, supplier::get), sourceReaderContext, sourceConfiguration, pulsarClient, pulsarAdmin);
        Objects.requireNonNull(supplier);
        this.cursorCommitThrowable = new AtomicReference<>();
        this.cursorsToCommit = Collections.synchronizedSortedMap(new TreeMap());
        this.cursorsOfFinishedSplits = new ConcurrentHashMap();
    }

    @Override // org.apache.flink.connector.base.source.reader.SourceReaderBase
    public void start() {
        super.start();
        if (this.sourceConfiguration.isEnableAutoAcknowledgeMessage()) {
            this.cursorScheduler = Executors.newSingleThreadScheduledExecutor();
            this.cursorScheduler.scheduleAtFixedRate(this::cumulativeAcknowledgmentMessage, this.sourceConfiguration.getMaxFetchTime().toMillis(), this.sourceConfiguration.getAutoCommitCursorInterval(), TimeUnit.MILLISECONDS);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.connector.base.source.reader.SourceReaderBase
    public InputStatus pollNext(ReaderOutput<OUT> readerOutput) throws Exception {
        checkErrorAndRethrow();
        return super.pollNext(readerOutput);
    }

    @Override // org.apache.flink.connector.base.source.reader.SourceReaderBase
    protected void onSplitFinished(Map<String, PulsarPartitionSplitState> map) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("onSplitFinished event: {}", map);
        }
        Iterator<Map.Entry<String, PulsarPartitionSplitState>> it = map.entrySet().iterator();
        while (it.hasNext()) {
            PulsarPartitionSplitState value = it.next().getValue();
            MessageId latestConsumedId = value.getLatestConsumedId();
            if (latestConsumedId != null) {
                this.cursorsOfFinishedSplits.put(value.getPartition(), latestConsumedId);
            }
        }
    }

    @Override // org.apache.flink.connector.base.source.reader.SourceReaderBase
    public List<PulsarPartitionSplit> snapshotState(long j) {
        List<PulsarPartitionSplit> snapshotState = super.snapshotState(j);
        Map<TopicPartition, MessageId> computeIfAbsent = this.cursorsToCommit.computeIfAbsent(Long.valueOf(j), l -> {
            return new HashMap();
        });
        for (PulsarPartitionSplit pulsarPartitionSplit : snapshotState) {
            MessageId latestConsumedId = pulsarPartitionSplit.getLatestConsumedId();
            if (latestConsumedId != null) {
                computeIfAbsent.put(pulsarPartitionSplit.getPartition(), latestConsumedId);
            }
        }
        computeIfAbsent.putAll(this.cursorsOfFinishedSplits);
        return snapshotState;
    }

    public void notifyCheckpointComplete(long j) {
        LOG.debug("Committing cursors for checkpoint {}", Long.valueOf(j));
        Map<TopicPartition, MessageId> map = this.cursorsToCommit.get(Long.valueOf(j));
        try {
            ((PulsarOrderedFetcherManager) this.splitFetcherManager).acknowledgeMessages(map);
            LOG.debug("Successfully acknowledge cursors for checkpoint {}", Long.valueOf(j));
            this.cursorsOfFinishedSplits.keySet().removeAll(map.keySet());
            this.cursorsToCommit.headMap(Long.valueOf(j + 1)).clear();
        } catch (Exception e) {
            LOG.error("Failed to acknowledge cursors for checkpoint {}", Long.valueOf(j), e);
            this.cursorCommitThrowable.compareAndSet(null, e);
        }
    }

    @Override // org.apache.flink.connector.pulsar.source.reader.source.PulsarSourceReaderBase, org.apache.flink.connector.base.source.reader.SourceReaderBase
    public void close() throws Exception {
        if (this.cursorScheduler != null) {
            this.cursorScheduler.shutdown();
        }
        super.close();
    }

    private void checkErrorAndRethrow() {
        Throwable th = this.cursorCommitThrowable.get();
        if (th != null) {
            throw new RuntimeException("An error occurred in acknowledge message.", th);
        }
    }

    private void cumulativeAcknowledgmentMessage() {
        HashMap hashMap = new HashMap(this.cursorsOfFinishedSplits);
        for (PulsarPartitionSplit pulsarPartitionSplit : super.snapshotState(1L)) {
            MessageId latestConsumedId = pulsarPartitionSplit.getLatestConsumedId();
            if (latestConsumedId != null) {
                hashMap.put(pulsarPartitionSplit.getPartition(), latestConsumedId);
            }
        }
        try {
            ((PulsarOrderedFetcherManager) this.splitFetcherManager).acknowledgeMessages(hashMap);
            this.cursorsOfFinishedSplits.keySet().removeAll(hashMap.keySet());
        } catch (Exception e) {
            LOG.error("Fail in auto cursor commit.", e);
            this.cursorCommitThrowable.compareAndSet(null, e);
        }
    }
}
