package org.apache.pulsar.broker.service;

import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.stats.Rate;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.SafeCollectionUtils;
import org.apache.pulsar.transaction.common.exception.TransactionConflictException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/service/Consumer.class */
public class Consumer {
    private final Subscription subscription;
    private final PulsarApi.CommandSubscribe.SubType subType;
    private final TransportCnx cnx;
    private final String appId;
    private AuthenticationDataSource authenticationData;
    private final String topicName;
    private final int partitionIdx;
    private final PulsarApi.CommandSubscribe.InitialPosition subscriptionInitialPosition;
    private final long consumerId;
    private final int priorityLevel;
    private final boolean readCompacted;
    private final String consumerName;
    private long lastConsumedTimestamp;
    private long lastAckedTimestamp;
    private final ConcurrentLongLongPairHashMap pendingAcks;
    private final ConsumerStats stats;
    private final int maxUnackedMessages;
    private final Map<String, String> metadata;
    private final PulsarApi.KeySharedMeta keySharedMeta;
    private static final double avgPercent = 0.9d;
    private boolean preciseDispatcherFlowControl;
    private PositionImpl readPositionWhenJoining;
    private static final AtomicIntegerFieldUpdater<Consumer> MESSAGE_PERMITS_UPDATER = AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "messagePermits");
    private static final AtomicIntegerFieldUpdater<Consumer> PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "permitsReceivedWhileConsumerBlocked");
    private static final AtomicIntegerFieldUpdater<Consumer> UNACKED_MESSAGES_UPDATER = AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "unackedMessages");
    private static final AtomicIntegerFieldUpdater<Consumer> AVG_MESSAGES_PER_ENTRY = AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "avgMessagesPerEntry");
    private static final Logger log = LoggerFactory.getLogger(Consumer.class);
    private volatile int messagePermits = 0;
    private volatile int permitsReceivedWhileConsumerBlocked = 0;
    private volatile int unackedMessages = 0;
    private volatile boolean blockedConsumerOnUnackedMsgs = false;
    private volatile int avgMessagesPerEntry = 1000;
    private final Rate msgOut = new Rate();
    private Rate chuckedMessageRate = new Rate();
    private final Rate msgRedeliver = new Rate();
    private final LongAdder bytesOutCounter = new LongAdder();
    private final LongAdder msgOutCounter = new LongAdder();

    public Consumer(Subscription subscription, PulsarApi.CommandSubscribe.SubType subType, String str, long j, int i, String str2, int i2, TransportCnx transportCnx, String str3, Map<String, String> map, boolean z, PulsarApi.CommandSubscribe.InitialPosition initialPosition, PulsarApi.KeySharedMeta keySharedMeta) throws BrokerServiceException {
        this.subscription = subscription;
        this.subType = subType;
        this.topicName = str;
        this.partitionIdx = TopicName.getPartitionIndex(str);
        this.consumerId = j;
        this.priorityLevel = i;
        this.readCompacted = z;
        this.consumerName = str2;
        this.maxUnackedMessages = i2;
        this.subscriptionInitialPosition = initialPosition;
        this.keySharedMeta = keySharedMeta;
        this.cnx = transportCnx;
        this.appId = str3;
        this.authenticationData = transportCnx.getAuthenticationData();
        this.preciseDispatcherFlowControl = transportCnx.isPreciseDispatcherFlowControl();
        PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.set(this, 0);
        MESSAGE_PERMITS_UPDATER.set(this, 0);
        UNACKED_MESSAGES_UPDATER.set(this, 0);
        AVG_MESSAGES_PER_ENTRY.set(this, 1000);
        this.metadata = map != null ? map : Collections.emptyMap();
        this.stats = new ConsumerStats();
        this.stats.setAddress(transportCnx.clientAddress().toString());
        this.stats.consumerName = str2;
        this.stats.setConnectedSince(DateFormatter.now());
        this.stats.setClientVersion(transportCnx.getClientVersion());
        this.stats.metadata = this.metadata;
        if (Subscription.isIndividualAckMode(subType)) {
            this.pendingAcks = new ConcurrentLongLongPairHashMap(256, 1);
        } else {
            this.pendingAcks = null;
        }
    }

    public PulsarApi.CommandSubscribe.SubType subType() {
        return this.subType;
    }

    public long consumerId() {
        return this.consumerId;
    }

    public String consumerName() {
        return this.consumerName;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void notifyActiveConsumerChange(Consumer consumer) {
        if (log.isDebugEnabled()) {
            log.debug("notify consumer {} - that [{}] for subscription {} has new active consumer : {}", new Object[]{Long.valueOf(this.consumerId), this.topicName, this.subscription.getName(), consumer});
        }
        this.cnx.getCommandSender().sendActiveConsumerChange(this.consumerId, this == consumer);
    }

    public boolean readCompacted() {
        return this.readCompacted;
    }

    public Future<Void> sendMessages(List<Entry> list, EntryBatchSizes entryBatchSizes, EntryBatchIndexesAcks entryBatchIndexesAcks, int i, long j, long j2, RedeliveryTracker redeliveryTracker) {
        this.lastConsumedTimestamp = System.currentTimeMillis();
        if (list.isEmpty() || i == 0) {
            if (log.isDebugEnabled()) {
                log.debug("[{}-{}] List of messages is empty, triggering write future immediately for consumerId {}", new Object[]{this.topicName, this.subscription, Long.valueOf(this.consumerId)});
            }
            entryBatchSizes.recyle();
            if (entryBatchIndexesAcks != null) {
                entryBatchIndexesAcks.recycle();
            }
            Promise<Void> newPromise = this.cnx.newPromise();
            newPromise.setSuccess((Object) null);
            return newPromise;
        }
        if (this.pendingAcks != null) {
            for (int i2 = 0; i2 < list.size(); i2++) {
                Entry entry = list.get(i2);
                if (entry != null) {
                    this.pendingAcks.put(entry.getLedgerId(), entry.getEntryId(), entryBatchSizes.getBatchSize(i2), 0L);
                }
            }
        }
        AVG_MESSAGES_PER_ENTRY.set(this, (int) Math.round((AVG_MESSAGES_PER_ENTRY.get(this) * avgPercent) + ((0.09999999999999998d * i) / list.size())));
        MESSAGE_PERMITS_UPDATER.addAndGet(this, (entryBatchIndexesAcks == null ? 0 : entryBatchIndexesAcks.getTotalAckedIndexCount()) - i);
        incrementUnackedMessages(i);
        this.msgOut.recordMultipleEvents(i, j);
        this.msgOutCounter.add(i);
        this.bytesOutCounter.add(j);
        this.chuckedMessageRate.recordMultipleEvents(j2, 0L);
        return this.cnx.getCommandSender().mo101sendMessagesToConsumer(this.consumerId, this.topicName, this.subscription, this.partitionIdx, list, entryBatchSizes, entryBatchIndexesAcks, redeliveryTracker);
    }

    private void incrementUnackedMessages(int i) {
        if (!Subscription.isIndividualAckMode(this.subType) || addAndGetUnAckedMsgs(this, i) < this.maxUnackedMessages || this.maxUnackedMessages <= 0) {
            return;
        }
        this.blockedConsumerOnUnackedMsgs = true;
    }

    public boolean isWritable() {
        return this.cnx.isWritable();
    }

    public void close() throws BrokerServiceException {
        close(false);
    }

    public void close(boolean z) throws BrokerServiceException {
        this.subscription.removeConsumer(this, z);
        this.cnx.removedConsumer(this);
    }

    public void disconnect() {
        disconnect(false);
    }

    public void disconnect(boolean z) {
        log.info("Disconnecting consumer: {}", this);
        this.cnx.closeConsumer(this);
        try {
            close(z);
        } catch (BrokerServiceException e) {
            log.warn("Consumer {} was already closed: {}", new Object[]{this, e.getMessage(), e});
        }
    }

    public void doUnsubscribe(long j) {
        this.subscription.doUnsubscribe(this).thenAccept(r7 -> {
            log.info("Unsubscribed successfully from {}", this.subscription);
            this.cnx.removedConsumer(this);
            this.cnx.getCommandSender().sendSuccess(j);
        }).exceptionally(th -> {
            log.warn("Unsubscribe failed for {}", this.subscription, th);
            this.cnx.getCommandSender().sendError(j, BrokerServiceException.getClientErrorCode(th), th.getCause().getMessage());
            return null;
        });
    }

    public CompletableFuture<Void> messageAcked(PulsarApi.CommandAck commandAck) {
        this.lastAckedTimestamp = System.currentTimeMillis();
        Map<String, Long> emptyMap = Collections.emptyMap();
        if (commandAck.getPropertiesCount() > 0) {
            emptyMap = (Map) commandAck.getPropertiesList().stream().collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, (v0) -> {
                return v0.getValue();
            }));
        }
        if (commandAck.getAckType() != PulsarApi.CommandAck.AckType.Cumulative) {
            return (commandAck.hasTxnidLeastBits() && commandAck.hasTxnidMostBits()) ? individualAckWithTransaction(commandAck) : individualAckNormal(commandAck, emptyMap);
        }
        if (commandAck.getMessageIdCount() != 1) {
            log.warn("[{}] [{}] Received multi-message ack", this.subscription, Long.valueOf(this.consumerId));
        }
        if (Subscription.isIndividualAckMode(this.subType)) {
            log.warn("[{}] [{}] Received cumulative ack on shared subscription, ignoring", this.subscription, Long.valueOf(this.consumerId));
        }
        PositionImpl positionImpl = PositionImpl.earliest;
        if (commandAck.getMessageIdCount() == 1) {
            PulsarApi.MessageIdData messageId = commandAck.getMessageId(0);
            positionImpl = messageId.getAckSetCount() > 0 ? PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId(), SafeCollectionUtils.longListToArray(messageId.getAckSetList())) : PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId());
        }
        if (commandAck.hasTxnidMostBits() && commandAck.hasTxnidLeastBits()) {
            return transactionCumulativeAcknowledge(commandAck.getTxnidMostBits(), commandAck.getTxnidLeastBits(), Collections.singletonList(positionImpl));
        }
        this.subscription.acknowledgeMessage(Collections.singletonList(positionImpl), PulsarApi.CommandAck.AckType.Cumulative, emptyMap);
        return CompletableFuture.completedFuture(null);
    }

    private CompletableFuture<Void> individualAckNormal(PulsarApi.CommandAck commandAck, Map<String, Long> map) {
        PositionImpl positionImpl;
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < commandAck.getMessageIdCount(); i++) {
            PulsarApi.MessageIdData messageId = commandAck.getMessageId(i);
            if (messageId.getAckSetCount() > 0) {
                positionImpl = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId(), SafeCollectionUtils.longListToArray(messageId.getAckSetList()));
                if (isTransactionEnabled() && Subscription.isIndividualAckMode(this.subType)) {
                    ((PersistentSubscription) this.subscription).syncBatchPositionBitSetForPendingAck(positionImpl);
                }
            } else {
                positionImpl = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId());
            }
            arrayList.add(positionImpl);
            checkCanRemovePendingAcksAndHandle(positionImpl, messageId);
            checkAckValidationError(commandAck, positionImpl);
        }
        this.subscription.acknowledgeMessage(arrayList, PulsarApi.CommandAck.AckType.Individual, map);
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        completableFuture.complete(null);
        if (isTransactionEnabled() && Subscription.isIndividualAckMode(this.subType)) {
            completableFuture.whenComplete((r5, th) -> {
                arrayList.forEach(position -> {
                    if (((PositionImpl) position).getAckSet() == null || !((PersistentSubscription) this.subscription).checkIsCanDeleteConsumerPendingAck((PositionImpl) position)) {
                        return;
                    }
                    removePendingAcks((PositionImpl) position);
                });
            });
        }
        return completableFuture;
    }

    private CompletableFuture<Void> individualAckWithTransaction(PulsarApi.CommandAck commandAck) {
        ArrayList arrayList = new ArrayList();
        if (!isTransactionEnabled()) {
            return FutureUtil.failedFuture(new BrokerServiceException.NotAllowedException("Server don't support transaction ack!"));
        }
        for (int i = 0; i < commandAck.getMessageIdCount(); i++) {
            PulsarApi.MessageIdData messageId = commandAck.getMessageId(i);
            PositionImpl positionImpl = messageId.getAckSetCount() > 0 ? PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId(), SafeCollectionUtils.longListToArray(messageId.getAckSetList())) : PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId());
            if (messageId.hasBatchIndex()) {
                arrayList.add(new MutablePair<>(positionImpl, Integer.valueOf(messageId.getBatchSize())));
            } else {
                arrayList.add(new MutablePair<>(positionImpl, 0));
            }
            checkCanRemovePendingAcksAndHandle(positionImpl, messageId);
            checkAckValidationError(commandAck, positionImpl);
        }
        CompletableFuture<Void> transactionIndividualAcknowledge = transactionIndividualAcknowledge(commandAck.getTxnidMostBits(), commandAck.getTxnidLeastBits(), arrayList);
        if (Subscription.isIndividualAckMode(this.subType)) {
            transactionIndividualAcknowledge.whenComplete((r5, th) -> {
                arrayList.forEach(mutablePair -> {
                    if (((PositionImpl) mutablePair.getLeft()).getAckSet() == null || !((PersistentSubscription) this.subscription).checkIsCanDeleteConsumerPendingAck((PositionImpl) mutablePair.left)) {
                        return;
                    }
                    removePendingAcks((PositionImpl) mutablePair.left);
                });
            });
        }
        return transactionIndividualAcknowledge;
    }

    private void checkAckValidationError(PulsarApi.CommandAck commandAck, PositionImpl positionImpl) {
        if (commandAck.hasValidationError()) {
            log.error("[{}] [{}] Received ack for corrupted message at {} - Reason: {}", new Object[]{this.subscription, Long.valueOf(this.consumerId), positionImpl, commandAck.getValidationError()});
        }
    }

    private void checkCanRemovePendingAcksAndHandle(PositionImpl positionImpl, PulsarApi.MessageIdData messageIdData) {
        if (Subscription.isIndividualAckMode(this.subType) && messageIdData.getAckSetCount() == 0) {
            removePendingAcks(positionImpl);
        }
    }

    private boolean isTransactionEnabled() {
        return (this.subscription instanceof PersistentSubscription) && ((PersistentTopic) this.subscription.getTopic()).getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled();
    }

    private CompletableFuture<Void> transactionIndividualAcknowledge(long j, long j2, List<MutablePair<PositionImpl, Integer>> list) {
        if (this.subscription instanceof PersistentSubscription) {
            return ((PersistentSubscription) this.subscription).transactionIndividualAcknowledge(new TxnID(j, j2), list);
        }
        log.error("Transaction acknowledge only support the `PersistentSubscription`.");
        return FutureUtil.failedFuture(new TransactionConflictException("Transaction acknowledge only support the `PersistentSubscription`."));
    }

    private CompletableFuture<Void> transactionCumulativeAcknowledge(long j, long j2, List<PositionImpl> list) {
        if (!isTransactionEnabled()) {
            return FutureUtil.failedFuture(new BrokerServiceException.NotAllowedException("Server don't support transaction ack!"));
        }
        if (this.subscription instanceof PersistentSubscription) {
            return ((PersistentSubscription) this.subscription).transactionCumulativeAcknowledge(new TxnID(j, j2), list);
        }
        log.error("Transaction acknowledge only support the `PersistentSubscription`.");
        return FutureUtil.failedFuture(new TransactionConflictException("Transaction acknowledge only support the `PersistentSubscription`."));
    }

    public void flowPermits(int i) {
        int andAdd;
        Preconditions.checkArgument(i > 0);
        if (shouldBlockConsumerOnUnackMsgs() && this.unackedMessages >= this.maxUnackedMessages) {
            this.blockedConsumerOnUnackedMsgs = true;
        }
        if (this.blockedConsumerOnUnackedMsgs) {
            andAdd = PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndAdd(this, i);
        } else {
            andAdd = MESSAGE_PERMITS_UPDATER.getAndAdd(this, i);
            this.subscription.consumerFlow(this, i);
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] Added more flow control message permits {} (old was: {}), blocked = {} ", new Object[]{this.topicName, this.subscription, Integer.valueOf(i), Integer.valueOf(andAdd), Boolean.valueOf(this.blockedConsumerOnUnackedMsgs)});
        }
    }

    void flowConsumerBlockedPermits(Consumer consumer) {
        int andSet = PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndSet(consumer, 0);
        MESSAGE_PERMITS_UPDATER.getAndAdd(consumer, andSet);
        this.subscription.consumerFlow(consumer, andSet);
    }

    public int getAvailablePermits() {
        return MESSAGE_PERMITS_UPDATER.get(this);
    }

    public int getAvgMessagesPerEntry() {
        return AVG_MESSAGES_PER_ENTRY.get(this);
    }

    public boolean isBlocked() {
        return this.blockedConsumerOnUnackedMsgs;
    }

    public void reachedEndOfTopic() {
        this.cnx.getCommandSender().sendReachedEndOfTopic(this.consumerId);
    }

    private boolean shouldBlockConsumerOnUnackMsgs() {
        return Subscription.isIndividualAckMode(this.subType) && this.maxUnackedMessages > 0;
    }

    public void updateRates() {
        this.msgOut.calculateRate();
        this.chuckedMessageRate.calculateRate();
        this.msgRedeliver.calculateRate();
        this.stats.msgRateOut = this.msgOut.getRate();
        this.stats.msgThroughputOut = this.msgOut.getValueRate();
        this.stats.msgRateRedeliver = this.msgRedeliver.getRate();
        this.stats.chuckedMessageRate = this.chuckedMessageRate.getRate();
    }

    public ConsumerStats getStats() {
        this.stats.msgOutCounter = this.msgOutCounter.longValue();
        this.stats.bytesOutCounter = this.bytesOutCounter.longValue();
        this.stats.lastAckedTimestamp = this.lastAckedTimestamp;
        this.stats.lastConsumedTimestamp = this.lastConsumedTimestamp;
        this.stats.availablePermits = getAvailablePermits();
        this.stats.unackedMessages = this.unackedMessages;
        this.stats.blockedConsumerOnUnackedMsgs = this.blockedConsumerOnUnackedMsgs;
        this.stats.avgMessagesPerEntry = getAvgMessagesPerEntry();
        if (this.readPositionWhenJoining != null) {
            this.stats.readPositionWhenJoining = this.readPositionWhenJoining.toString();
        }
        return this.stats;
    }

    public int getUnackedMessages() {
        return this.unackedMessages;
    }

    public PulsarApi.KeySharedMeta getKeySharedMeta() {
        return this.keySharedMeta;
    }

    public String toString() {
        return MoreObjects.toStringHelper(this).add("subscription", this.subscription).add("consumerId", this.consumerId).add("consumerName", this.consumerName).add("address", this.cnx.clientAddress()).toString();
    }

    public void checkPermissions() {
        TopicName topicName = TopicName.get(this.subscription.getTopicName());
        if (this.cnx.getBrokerService().getAuthorizationService() != null) {
            try {
                if (this.cnx.getBrokerService().getAuthorizationService().canConsume(topicName, this.appId, this.authenticationData, this.subscription.getName())) {
                    return;
                }
            } catch (Exception e) {
                log.warn("[{}] Get unexpected error while autorizing [{}]  {}", new Object[]{this.appId, this.subscription.getTopicName(), e.getMessage(), e});
            }
            log.info("[{}] is not allowed to consume from topic [{}] anymore", this.appId, this.subscription.getTopicName());
            disconnect();
        }
    }

    public boolean equals(Object obj) {
        if (!(obj instanceof Consumer)) {
            return false;
        }
        Consumer consumer = (Consumer) obj;
        return Objects.equals(this.cnx.clientAddress(), consumer.cnx.clientAddress()) && this.consumerId == consumer.consumerId;
    }

    public int hashCode() {
        return this.consumerName.hashCode() + (31 * this.cnx.hashCode());
    }

    private void removePendingAcks(PositionImpl positionImpl) {
        Consumer consumer = null;
        if (this.pendingAcks.get(positionImpl.getLedgerId(), positionImpl.getEntryId()) == null) {
            Iterator<Consumer> it = this.subscription.getConsumers().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                Consumer next = it.next();
                if (!next.equals(this) && next.getPendingAcks().containsKey(positionImpl.getLedgerId(), positionImpl.getEntryId())) {
                    consumer = next;
                    break;
                }
            }
        } else {
            consumer = this;
        }
        ConcurrentLongLongPairHashMap.LongPair longPair = consumer != null ? consumer.getPendingAcks().get(positionImpl.getLedgerId(), positionImpl.getEntryId()) : null;
        if (longPair != null) {
            int i = (int) longPair.first;
            if (consumer.getPendingAcks().remove(positionImpl.getLedgerId(), positionImpl.getEntryId())) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}-{}] consumer {} received ack {}", new Object[]{this.topicName, this.subscription, Long.valueOf(this.consumerId), positionImpl});
                }
                if (addAndGetUnAckedMsgs(consumer, -i) <= this.maxUnackedMessages / 2 && consumer.blockedConsumerOnUnackedMsgs && consumer.shouldBlockConsumerOnUnackMsgs()) {
                    consumer.blockedConsumerOnUnackedMsgs = false;
                    flowConsumerBlockedPermits(consumer);
                }
            }
        }
    }

    public ConcurrentLongLongPairHashMap getPendingAcks() {
        return this.pendingAcks;
    }

    public int getPriorityLevel() {
        return this.priorityLevel;
    }

    public void redeliverUnacknowledgedMessages() {
        clearUnAckedMsgs();
        this.blockedConsumerOnUnackedMsgs = false;
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] consumer {} received redelivery", new Object[]{this.topicName, this.subscription, Long.valueOf(this.consumerId)});
        }
        if (this.pendingAcks != null) {
            ArrayList<PositionImpl> arrayList = new ArrayList((int) this.pendingAcks.size());
            MutableInt mutableInt = new MutableInt(0);
            this.pendingAcks.forEach((j, j2, j3, j4) -> {
                mutableInt.add((int) j3);
                arrayList.add(new PositionImpl(j, j2));
            });
            for (PositionImpl positionImpl : arrayList) {
                this.pendingAcks.remove(positionImpl.getLedgerId(), positionImpl.getEntryId());
            }
            this.msgRedeliver.recordMultipleEvents(mutableInt.intValue(), mutableInt.intValue());
            this.subscription.redeliverUnacknowledgedMessages(this, arrayList);
        } else {
            this.subscription.redeliverUnacknowledgedMessages(this);
        }
        flowConsumerBlockedPermits(this);
    }

    public void redeliverUnacknowledgedMessages(List<PulsarApi.MessageIdData> list) {
        int i = 0;
        ArrayList newArrayList = Lists.newArrayList();
        for (PulsarApi.MessageIdData messageIdData : list) {
            PositionImpl positionImpl = PositionImpl.get(messageIdData.getLedgerId(), messageIdData.getEntryId());
            ConcurrentLongLongPairHashMap.LongPair longPair = this.pendingAcks.get(positionImpl.getLedgerId(), positionImpl.getEntryId());
            if (longPair != null) {
                this.pendingAcks.remove(positionImpl.getLedgerId(), positionImpl.getEntryId());
                i = (int) (i + longPair.first);
                newArrayList.add(positionImpl);
            }
        }
        addAndGetUnAckedMsgs(this, -i);
        this.blockedConsumerOnUnackedMsgs = false;
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] consumer {} received {} msg-redelivery {}", new Object[]{this.topicName, this.subscription, Long.valueOf(this.consumerId), Integer.valueOf(i), Integer.valueOf(newArrayList.size())});
        }
        this.subscription.redeliverUnacknowledgedMessages(this, newArrayList);
        this.msgRedeliver.recordMultipleEvents(i, i);
        int andSet = PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndSet(this, 0);
        if (andSet > 0) {
            MESSAGE_PERMITS_UPDATER.getAndAdd(this, andSet);
            this.subscription.consumerFlow(this, andSet);
        }
    }

    public Subscription getSubscription() {
        return this.subscription;
    }

    private int addAndGetUnAckedMsgs(Consumer consumer, int i) {
        this.subscription.addUnAckedMessages(i);
        return UNACKED_MESSAGES_UPDATER.addAndGet(consumer, i);
    }

    private void clearUnAckedMsgs() {
        this.subscription.addUnAckedMessages(-UNACKED_MESSAGES_UPDATER.getAndSet(this, 0));
    }

    public boolean isPreciseDispatcherFlowControl() {
        return this.preciseDispatcherFlowControl;
    }

    public void setReadPositionWhenJoining(PositionImpl positionImpl) {
        this.readPositionWhenJoining = positionImpl;
    }
}
