package org.apache.pulsar.broker.service.persistent;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.stats.Rate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.class */
public class PersistentMessageExpiryMonitor implements AsyncCallbacks.FindEntryCallback {
    private final ManagedCursor cursor;
    private final String subName;
    private final String topicName;
    private final boolean autoSkipNonRecoverableData;
    private final PersistentSubscription subscription;
    private static final int FALSE = 0;
    private static final int TRUE = 1;
    private static final AtomicIntegerFieldUpdater<PersistentMessageExpiryMonitor> expirationCheckInProgressUpdater = AtomicIntegerFieldUpdater.newUpdater(PersistentMessageExpiryMonitor.class, "expirationCheckInProgress");
    private static final Logger log = LoggerFactory.getLogger(PersistentMessageExpiryMonitor.class);
    private volatile int expirationCheckInProgress = FALSE;
    private final AsyncCallbacks.MarkDeleteCallback markDeleteCallback = new AsyncCallbacks.MarkDeleteCallback() { // from class: org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor.1
        public void markDeleteComplete(Object obj) {
            long longValue = ((Long) obj).longValue() - PersistentMessageExpiryMonitor.this.cursor.getNumberOfEntriesInBacklog(false);
            PersistentMessageExpiryMonitor.this.msgExpired.recordMultipleEvents(longValue, 0L);
            PersistentMessageExpiryMonitor.this.updateRates();
            if (PersistentMessageExpiryMonitor.this.subscription != null && PersistentMessageExpiryMonitor.this.subscription.getType() == PulsarApi.CommandSubscribe.SubType.Key_Shared) {
                PersistentMessageExpiryMonitor.this.subscription.getDispatcher().markDeletePositionMoveForward();
            }
            if (PersistentMessageExpiryMonitor.log.isDebugEnabled()) {
                PersistentMessageExpiryMonitor.log.debug("[{}][{}] Mark deleted {} messages", new Object[]{PersistentMessageExpiryMonitor.this.topicName, PersistentMessageExpiryMonitor.this.subName, Long.valueOf(longValue)});
            }
        }

        public void markDeleteFailed(ManagedLedgerException managedLedgerException, Object obj) {
            PersistentMessageExpiryMonitor.log.warn("[{}][{}] Message expiry failed - mark delete failed", new Object[]{PersistentMessageExpiryMonitor.this.topicName, PersistentMessageExpiryMonitor.this.subName, managedLedgerException});
            PersistentMessageExpiryMonitor.this.updateRates();
        }
    };
    private final Rate msgExpired = new Rate();

    public PersistentMessageExpiryMonitor(String str, String str2, ManagedCursor managedCursor, PersistentSubscription persistentSubscription) {
        this.topicName = str;
        this.cursor = managedCursor;
        this.subName = str2;
        this.subscription = persistentSubscription;
        this.autoSkipNonRecoverableData = this.cursor.getManagedLedger() != null && this.cursor.getManagedLedger().getConfig().isAutoSkipNonRecoverableData();
    }

    public void expireMessages(int i) {
        if (expirationCheckInProgressUpdater.compareAndSet(this, FALSE, 1)) {
            log.info("[{}][{}] Starting message expiry check, ttl= {} seconds", new Object[]{this.topicName, this.subName, Integer.valueOf(i)});
            this.cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> {
                MessageImpl messageImpl = FALSE;
                try {
                    try {
                        messageImpl = MessageImpl.deserialize(entry.getDataBuffer());
                        boolean isExpired = messageImpl.isExpired(i);
                        entry.release();
                        if (messageImpl != null) {
                            messageImpl.recycle();
                        }
                        return isExpired;
                    } catch (Exception e) {
                        log.error("[{}][{}] Error deserializing message for expiry check", new Object[]{this.topicName, this.subName, e});
                        entry.release();
                        if (messageImpl == null) {
                            return false;
                        }
                        messageImpl.recycle();
                        return false;
                    }
                } catch (Throwable th) {
                    entry.release();
                    if (messageImpl != null) {
                        messageImpl.recycle();
                    }
                    throw th;
                }
            }, this, (Object) null);
        } else if (log.isDebugEnabled()) {
            log.debug("[{}][{}] Ignore expire-message scheduled task, last check is still running", this.topicName, this.subName);
        }
    }

    public void updateRates() {
        this.msgExpired.calculateRate();
    }

    public double getMessageExpiryRate() {
        return this.msgExpired.getRate();
    }

    public void findEntryComplete(Position position, Object obj) {
        if (position != null) {
            log.info("[{}][{}] Expiring all messages until position {}", new Object[]{this.topicName, this.subName, position});
            this.cursor.asyncMarkDelete(position, this.markDeleteCallback, Long.valueOf(this.cursor.getNumberOfEntriesInBacklog(false)));
        } else {
            if (log.isDebugEnabled()) {
                log.debug("[{}][{}] No messages to expire", this.topicName, this.subName);
            }
            updateRates();
        }
        this.expirationCheckInProgress = FALSE;
    }

    public void findEntryFailed(ManagedLedgerException managedLedgerException, Optional<Position> optional, Object obj) {
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}] Finding expired entry operation failed", new Object[]{this.topicName, this.subName, managedLedgerException});
        }
        if (this.autoSkipNonRecoverableData && optional.isPresent() && (managedLedgerException instanceof ManagedLedgerException.NonRecoverableLedgerException)) {
            log.warn("[{}][{}] read failed from ledger at position:{} : {}", new Object[]{this.topicName, this.subName, optional, managedLedgerException.getMessage()});
            findEntryComplete(optional.get(), obj);
        }
        this.expirationCheckInProgress = FALSE;
        updateRates();
    }
}
