package org.apache.flink.connector.pulsar.source.reader;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.connector.base.source.reader.RecordsBySplits;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto;
import org.apache.flink.connector.pulsar.common.metrics.MetricNames;
import org.apache.flink.connector.pulsar.source.config.CursorVerification;
import org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.CursorPosition;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.enumerator.topic.range.TopicRangeUtils;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerStats;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageCrypto;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.shade.com.google.common.base.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.class */
public class PulsarPartitionSplitReader implements SplitReader<Message<byte[]>, PulsarPartitionSplit> {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarPartitionSplitReader.class);
    private final PulsarClient pulsarClient;
    private final PulsarAdmin pulsarAdmin;
    private final SourceConfiguration sourceConfiguration;
    private final Schema<byte[]> schema;
    private final PulsarCrypto pulsarCrypto;
    private final SourceReaderMetricGroup metricGroup;
    private Consumer<byte[]> pulsarConsumer;
    private PulsarPartitionSplit registeredSplit;

    public PulsarPartitionSplitReader(PulsarClient pulsarClient, PulsarAdmin pulsarAdmin, SourceConfiguration sourceConfiguration, Schema<byte[]> schema, PulsarCrypto pulsarCrypto, SourceReaderMetricGroup sourceReaderMetricGroup) {
        this.pulsarClient = pulsarClient;
        this.pulsarAdmin = pulsarAdmin;
        this.sourceConfiguration = sourceConfiguration;
        this.schema = schema;
        this.pulsarCrypto = pulsarCrypto;
        this.metricGroup = sourceReaderMetricGroup;
    }

    @Override // org.apache.flink.connector.base.source.reader.splitreader.SplitReader
    public RecordsWithSplitIds<Message<byte[]>> fetch() throws IOException {
        RecordsBySplits.Builder builder = new RecordsBySplits.Builder();
        if (this.pulsarConsumer == null || this.registeredSplit == null) {
            return builder.build();
        }
        StopCursor stopCursor = this.registeredSplit.getStopCursor();
        String splitId = this.registeredSplit.splitId();
        Deadline fromNow = Deadline.fromNow(this.sourceConfiguration.getMaxFetchTime());
        for (int i = 0; i < this.sourceConfiguration.getMaxFetchRecords() && fromNow.hasTimeLeft(); i++) {
            try {
                int fetchOneMessageTime = this.sourceConfiguration.getFetchOneMessageTime();
                if (fetchOneMessageTime <= 0) {
                    fetchOneMessageTime = (int) fromNow.timeLeftIfAny().toMillis();
                }
                Message<?> receive = this.pulsarConsumer.receive(fetchOneMessageTime, TimeUnit.MILLISECONDS);
                if (receive == null) {
                    break;
                }
                StopCursor.StopCondition shouldStop = stopCursor.shouldStop(receive);
                if (shouldStop == StopCursor.StopCondition.CONTINUE || shouldStop == StopCursor.StopCondition.EXACTLY) {
                    builder.add(splitId, (String) receive);
                    LOG.debug("Finished polling message {}", receive);
                }
                if (shouldStop == StopCursor.StopCondition.EXACTLY || shouldStop == StopCursor.StopCondition.TERMINATE) {
                    builder.addFinishedSplit(splitId);
                    break;
                }
            } catch (TimeoutException e) {
            } catch (Exception e2) {
                throw new IOException(e2);
            }
        }
        return builder.build();
    }

    @Override // org.apache.flink.connector.base.source.reader.splitreader.SplitReader
    public void handleSplitsChanges(SplitsChange<PulsarPartitionSplit> splitsChange) {
        LOG.debug("Handle split changes {}", splitsChange);
        if (!(splitsChange instanceof SplitsAddition)) {
            throw new UnsupportedOperationException(String.format("The SplitChange type of %s is not supported.", splitsChange.getClass()));
        }
        if (this.registeredSplit != null) {
            throw new IllegalStateException("This split reader have assigned split.");
        }
        List<PulsarPartitionSplit> splits = splitsChange.splits();
        Preconditions.checkArgument(splits.size() == 1, "This pulsar split reader only supports one split.");
        this.registeredSplit = splits.get(0);
        try {
            this.registeredSplit.open(this.pulsarAdmin);
            MessageId latestConsumedId = this.registeredSplit.getLatestConsumedId();
            if (latestConsumedId != null) {
                LOG.info("Reset subscription position by the checkpoint {}", latestConsumedId);
                try {
                    ((latestConsumedId == MessageId.latest || latestConsumedId == MessageId.earliest) ? new CursorPosition(latestConsumedId, true) : new CursorPosition(latestConsumedId, false)).seekPosition(this.pulsarAdmin, this.registeredSplit.getPartition().getFullTopicName(), this.sourceConfiguration.getSubscriptionName());
                } catch (PulsarAdminException e) {
                    if (this.sourceConfiguration.getVerifyInitialOffsets() == CursorVerification.FAIL_ON_MISMATCH) {
                        throw new IllegalArgumentException(e);
                    }
                    LOG.warn("Failed to reset cursor to {} on partition {}", new Object[]{latestConsumedId, this.registeredSplit.getPartition(), e});
                }
            }
            try {
                this.pulsarConsumer = createPulsarConsumer(this.registeredSplit.getPartition());
                LOG.info("Register split {} consumer for current reader.", this.registeredSplit);
            } catch (PulsarClientException e2) {
                throw new FlinkRuntimeException(e2);
            }
        } catch (Exception e3) {
            throw new FlinkRuntimeException(e3);
        }
    }

    @Override // org.apache.flink.connector.base.source.reader.splitreader.SplitReader
    public void pauseOrResumeSplits(Collection<PulsarPartitionSplit> collection, Collection<PulsarPartitionSplit> collection2) {
        Preconditions.checkState(collection.size() + collection2.size() <= 1, "This pulsar split reader only supports one split.");
        if (!collection.isEmpty()) {
            this.pulsarConsumer.pause();
        } else {
            if (collection2.isEmpty()) {
                return;
            }
            this.pulsarConsumer.resume();
        }
    }

    @Override // org.apache.flink.connector.base.source.reader.splitreader.SplitReader
    public void wakeUp() {
    }

    @Override // org.apache.flink.connector.base.source.reader.splitreader.SplitReader
    public void close() throws PulsarClientException {
        if (this.pulsarConsumer != null) {
            this.pulsarConsumer.close();
        }
    }

    public void notifyCheckpointComplete(TopicPartition topicPartition, MessageId messageId) throws PulsarClientException {
        if (this.pulsarConsumer == null) {
            this.pulsarConsumer = createPulsarConsumer(topicPartition);
        }
        this.pulsarConsumer.acknowledgeCumulative(messageId);
    }

    private Consumer<byte[]> createPulsarConsumer(TopicPartition topicPartition) throws PulsarClientException {
        ConsumerBuilder createConsumerBuilder = PulsarSourceConfigUtils.createConsumerBuilder(this.pulsarClient, this.schema, this.sourceConfiguration);
        createConsumerBuilder.topic(topicPartition.getFullTopicName());
        CryptoKeyReader cryptoKeyReader = this.pulsarCrypto.cryptoKeyReader();
        if (cryptoKeyReader != null) {
            createConsumerBuilder.cryptoKeyReader(cryptoKeyReader);
            MessageCrypto<MessageMetadata, MessageMetadata> messageCrypto = this.pulsarCrypto.messageCrypto();
            if (messageCrypto != null) {
                createConsumerBuilder.messageCrypto(messageCrypto);
            }
        }
        if (!TopicRangeUtils.isFullTopicRanges(topicPartition.getRanges())) {
            KeySharedPolicy.KeySharedPolicySticky ranges = KeySharedPolicy.stickyHashRange().ranges(topicPartition.getPulsarRanges());
            ranges.setAllowOutOfOrderDelivery(this.sourceConfiguration.isAllowKeySharedOutOfOrderDelivery());
            createConsumerBuilder.keySharedPolicy(ranges);
        }
        Consumer<byte[]> subscribe = createConsumerBuilder.subscribe();
        exposeConsumerMetrics(subscribe);
        return subscribe;
    }

    private void exposeConsumerMetrics(Consumer<byte[]> consumer) {
        if (this.sourceConfiguration.isEnableMetrics()) {
            String consumerName = consumer.getConsumerName();
            if (Strings.isNullOrEmpty(consumerName)) {
                consumerName = UUID.randomUUID().toString();
            }
            MetricGroup addGroup = this.metricGroup.addGroup(MetricNames.PULSAR_CONSUMER_METRIC_NAME).addGroup(consumer.getTopic()).addGroup(consumerName);
            ConsumerStats stats = consumer.getStats();
            Objects.requireNonNull(stats);
            addGroup.gauge(MetricNames.NUM_MSGS_RECEIVED, stats::getNumMsgsReceived);
            Objects.requireNonNull(stats);
            addGroup.gauge(MetricNames.NUM_BYTES_RECEIVED, stats::getNumBytesReceived);
            Objects.requireNonNull(stats);
            addGroup.gauge(MetricNames.RATE_MSGS_RECEIVED, stats::getRateMsgsReceived);
            Objects.requireNonNull(stats);
            addGroup.gauge(MetricNames.RATE_BYTES_RECEIVED, stats::getRateBytesReceived);
            Objects.requireNonNull(stats);
            addGroup.gauge(MetricNames.NUM_ACKS_SENT, stats::getNumAcksSent);
            Objects.requireNonNull(stats);
            addGroup.gauge(MetricNames.NUM_ACKS_FAILED, stats::getNumAcksFailed);
            Objects.requireNonNull(stats);
            addGroup.gauge(MetricNames.NUM_RECEIVE_FAILED, stats::getNumReceiveFailed);
            Objects.requireNonNull(stats);
            addGroup.gauge(MetricNames.NUM_BATCH_RECEIVE_FAILED, stats::getNumBatchReceiveFailed);
            Objects.requireNonNull(stats);
            addGroup.gauge(MetricNames.TOTAL_MSGS_RECEIVED, stats::getTotalMsgsReceived);
            Objects.requireNonNull(stats);
            addGroup.gauge(MetricNames.TOTAL_BYTES_RECEIVED, stats::getTotalBytesReceived);
            Objects.requireNonNull(stats);
            addGroup.gauge(MetricNames.TOTAL_RECEIVED_FAILED, stats::getTotalReceivedFailed);
            Objects.requireNonNull(stats);
            addGroup.gauge(MetricNames.TOTAL_BATCH_RECEIVED_FAILED, stats::getTotaBatchReceivedFailed);
            Objects.requireNonNull(stats);
            addGroup.gauge(MetricNames.TOTAL_ACKS_SENT, stats::getTotalAcksSent);
            Objects.requireNonNull(stats);
            addGroup.gauge(MetricNames.TOTAL_ACKS_FAILED, stats::getTotalAcksFailed);
            Objects.requireNonNull(stats);
            addGroup.gauge(MetricNames.MSG_NUM_IN_RECEIVER_QUEUE, stats::getMsgNumInReceiverQueue);
        }
    }

    @VisibleForTesting
    String getSubscriptionName() {
        return this.sourceConfiguration.getSubscriptionName();
    }
}
