package org.apache.flink.connector.pulsar.source.reader;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.SourceReaderBase;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.pulsar.common.config.PulsarClientFactory;
import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto;
import org.apache.flink.connector.pulsar.common.schema.BytesSchema;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchemaInitializationContext;
import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarSchemaWrapper;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/connector/pulsar/source/reader/PulsarSourceReader.class */
public class PulsarSourceReader<OUT> extends SourceReaderBase<Message<byte[]>, OUT, PulsarPartitionSplit, PulsarPartitionSplitState> {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarSourceReader.class);
    private final SourceConfiguration sourceConfiguration;
    private final PulsarClient pulsarClient;
    private final PulsarAdmin pulsarAdmin;

    @VisibleForTesting
    final SortedMap<Long, Map<TopicPartition, MessageId>> cursorsToCommit;
    private final ConcurrentMap<TopicPartition, MessageId> cursorsOfFinishedSplits;
    private final AtomicReference<Throwable> cursorCommitThrowable;
    private ScheduledExecutorService cursorScheduler;

    private PulsarSourceReader(FutureCompletingBlockingQueue<RecordsWithSplitIds<Message<byte[]>>> futureCompletingBlockingQueue, PulsarSourceFetcherManager pulsarSourceFetcherManager, PulsarDeserializationSchema<OUT> pulsarDeserializationSchema, SourceConfiguration sourceConfiguration, PulsarClient pulsarClient, PulsarAdmin pulsarAdmin, SourceReaderContext sourceReaderContext) {
        super(futureCompletingBlockingQueue, pulsarSourceFetcherManager, new PulsarRecordEmitter(pulsarDeserializationSchema), sourceConfiguration, sourceReaderContext);
        this.sourceConfiguration = sourceConfiguration;
        this.pulsarClient = pulsarClient;
        this.pulsarAdmin = pulsarAdmin;
        this.cursorsToCommit = Collections.synchronizedSortedMap(new TreeMap());
        this.cursorsOfFinishedSplits = new ConcurrentHashMap();
        this.cursorCommitThrowable = new AtomicReference<>();
    }

    public void start() {
        super.start();
        if (this.sourceConfiguration.isEnableAutoAcknowledgeMessage()) {
            this.cursorScheduler = Executors.newSingleThreadScheduledExecutor();
            this.cursorScheduler.scheduleAtFixedRate(this::cumulativeAcknowledgmentMessage, this.sourceConfiguration.getMaxFetchTime().toMillis(), this.sourceConfiguration.getAutoCommitCursorInterval(), TimeUnit.MILLISECONDS);
        }
    }

    public InputStatus pollNext(ReaderOutput<OUT> readerOutput) throws Exception {
        Throwable th = this.cursorCommitThrowable.get();
        if (th != null) {
            throw new FlinkRuntimeException("An error occurred in acknowledge message.", th);
        }
        return super.pollNext(readerOutput);
    }

    protected void onSplitFinished(Map<String, PulsarPartitionSplitState> map) {
        Iterator<String> it = map.keySet().iterator();
        while (it.hasNext()) {
            ((PulsarSourceFetcherManager) this.splitFetcherManager).closeFetcher(it.next());
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("onSplitFinished event: {}", map);
        }
        Iterator<Map.Entry<String, PulsarPartitionSplitState>> it2 = map.entrySet().iterator();
        while (it2.hasNext()) {
            PulsarPartitionSplitState value = it2.next().getValue();
            MessageId latestConsumedId = value.getLatestConsumedId();
            if (latestConsumedId != null) {
                this.cursorsOfFinishedSplits.put(value.getPartition(), latestConsumedId);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public PulsarPartitionSplitState initializedState(PulsarPartitionSplit pulsarPartitionSplit) {
        return new PulsarPartitionSplitState(pulsarPartitionSplit);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public PulsarPartitionSplit toSplitType(String str, PulsarPartitionSplitState pulsarPartitionSplitState) {
        return pulsarPartitionSplitState.toPulsarPartitionSplit();
    }

    public void pauseOrResumeSplits(Collection<String> collection, Collection<String> collection2) {
        this.splitFetcherManager.pauseOrResumeSplits(collection, collection2);
    }

    public List<PulsarPartitionSplit> snapshotState(long j) {
        List<PulsarPartitionSplit> snapshotState = super.snapshotState(j);
        Map<TopicPartition, MessageId> computeIfAbsent = this.cursorsToCommit.computeIfAbsent(Long.valueOf(j), l -> {
            return new HashMap();
        });
        for (PulsarPartitionSplit pulsarPartitionSplit : snapshotState) {
            MessageId latestConsumedId = pulsarPartitionSplit.getLatestConsumedId();
            if (latestConsumedId != null) {
                computeIfAbsent.put(pulsarPartitionSplit.getPartition(), latestConsumedId);
            }
        }
        computeIfAbsent.putAll(this.cursorsOfFinishedSplits);
        return snapshotState;
    }

    public void notifyCheckpointComplete(long j) {
        LOG.debug("Committing cursors for checkpoint {}", Long.valueOf(j));
        Map<TopicPartition, MessageId> map = this.cursorsToCommit.get(Long.valueOf(j));
        try {
            ((PulsarSourceFetcherManager) this.splitFetcherManager).acknowledgeMessages(map);
            LOG.debug("Successfully acknowledge cursors for checkpoint {}", Long.valueOf(j));
            this.cursorsOfFinishedSplits.keySet().removeAll(map.keySet());
            this.cursorsToCommit.headMap(Long.valueOf(j + 1)).clear();
        } catch (Exception e) {
            LOG.error("Failed to acknowledge cursors for checkpoint {}", Long.valueOf(j), e);
            this.cursorCommitThrowable.compareAndSet(null, e);
        }
    }

    public void close() throws Exception {
        if (this.cursorScheduler != null) {
            this.cursorScheduler.shutdown();
        }
        super.close();
        this.pulsarClient.shutdown();
        this.pulsarAdmin.close();
    }

    private void cumulativeAcknowledgmentMessage() {
        HashMap hashMap = new HashMap(this.cursorsOfFinishedSplits);
        for (PulsarPartitionSplit pulsarPartitionSplit : super.snapshotState(1L)) {
            MessageId latestConsumedId = pulsarPartitionSplit.getLatestConsumedId();
            if (latestConsumedId != null) {
                hashMap.put(pulsarPartitionSplit.getPartition(), latestConsumedId);
            }
        }
        try {
            ((PulsarSourceFetcherManager) this.splitFetcherManager).acknowledgeMessages(hashMap);
            this.cursorsOfFinishedSplits.keySet().removeAll(hashMap.keySet());
        } catch (Exception e) {
            LOG.error("Fail in auto cursor commit.", e);
            this.cursorCommitThrowable.compareAndSet(null, e);
        }
    }

    public static <OUT> PulsarSourceReader<OUT> create(SourceConfiguration sourceConfiguration, PulsarDeserializationSchema<OUT> pulsarDeserializationSchema, PulsarCrypto pulsarCrypto, SourceReaderContext sourceReaderContext) throws Exception {
        FutureCompletingBlockingQueue futureCompletingBlockingQueue = new FutureCompletingBlockingQueue(sourceConfiguration.getMessageQueueCapacity());
        PulsarClient createClient = PulsarClientFactory.createClient(sourceConfiguration);
        PulsarAdmin createAdmin = PulsarClientFactory.createAdmin(sourceConfiguration);
        pulsarDeserializationSchema.open(new PulsarDeserializationSchemaInitializationContext(sourceReaderContext, createClient), sourceConfiguration);
        Schema bytesSchema = sourceConfiguration.isEnableSchemaEvolution() ? new BytesSchema(((PulsarSchemaWrapper) pulsarDeserializationSchema).pulsarSchema()) : Schema.BYTES;
        return new PulsarSourceReader<>(futureCompletingBlockingQueue, new PulsarSourceFetcherManager(futureCompletingBlockingQueue, () -> {
            return new PulsarPartitionSplitReader(createClient, createAdmin, sourceConfiguration, bytesSchema, pulsarCrypto, sourceReaderContext.metricGroup());
        }, sourceReaderContext.getConfiguration()), pulsarDeserializationSchema, sourceConfiguration, createClient, createAdmin, sourceReaderContext);
    }
}
