package com.rabbitmq.stream.impl;

import com.rabbitmq.stream.BackOffDelayPolicy;
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.MessageHandler;
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.StreamDoesNotExistException;
import com.rabbitmq.stream.StreamException;
import com.rabbitmq.stream.StreamNotAvailableException;
import com.rabbitmq.stream.SubscriptionListener;
import com.rabbitmq.stream.impl.Client;
import com.rabbitmq.stream.impl.Utils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*  JADX ERROR: NullPointerException in pass: ClassModifier
    java.lang.NullPointerException: Cannot invoke "java.util.List.forEach(java.util.function.Consumer)" because "blocks" is null
    	at jadx.core.utils.BlockUtils.collectAllInsns(BlockUtils.java:1017)
    	at jadx.core.dex.visitors.ClassModifier.removeBridgeMethod(ClassModifier.java:239)
    	at jadx.core.dex.visitors.ClassModifier.removeSyntheticMethods(ClassModifier.java:154)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.ClassModifier.visit(ClassModifier.java:64)
    	at jadx.core.dex.visitors.ClassModifier.visit(ClassModifier.java:57)
    */
/* loaded from: input_file:com/rabbitmq/stream/impl/ConsumersCoordinator.class */
public class ConsumersCoordinator {
    static final int MAX_SUBSCRIPTIONS_PER_CLIENT = 256;
    private final StreamEnvironment environment;
    private final Utils.ClientFactory clientFactory;
    private final int maxConsumersByConnection;
    private final Function<Utils.ClientConnectionType, String> connectionNamingStrategy;
    static final OffsetSpecification DEFAULT_OFFSET_SPECIFICATION = OffsetSpecification.next();
    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumersCoordinator.class);
    private static final Predicate<Exception> RETRY_ON_TIMEOUT = exc -> {
        return exc instanceof TimeoutStreamException;
    };
    private final Random random = new Random();
    private final AtomicLong managerIdSequence = new AtomicLong(0);
    private final NavigableSet<ClientSubscriptionsManager> managers = new ConcurrentSkipListSet();
    private final AtomicLong trackerIdSequence = new AtomicLong(0);
    private final boolean debug = true;
    private final List<SubscriptionTracker> trackers = new CopyOnWriteArrayList();

    /* loaded from: input_file:com/rabbitmq/stream/impl/ConsumersCoordinator$ClientClosedException.class */
    public static class ClientClosedException extends StreamException {
        public ClientClosedException() {
            super("Client already closed");
        }
    }

    /* loaded from: input_file:com/rabbitmq/stream/impl/ConsumersCoordinator$ClientSubscriptionsManager.class */
    public class ClientSubscriptionsManager implements Comparable<ClientSubscriptionsManager> {
        private final long id;
        private final Client.Broker node;
        private final Client client;
        private final String name;
        private final Map<String, Set<SubscriptionTracker>> streamToStreamSubscriptions;
        private volatile List<SubscriptionTracker> subscriptionTrackers;
        private volatile int trackerCount;
        private final AtomicBoolean closed;

        private ClientSubscriptionsManager(Client.Broker broker, Client.ClientParameters clientParameters) {
            this.streamToStreamSubscriptions = new ConcurrentHashMap();
            this.subscriptionTrackers = new ArrayList(ConsumersCoordinator.this.maxConsumersByConnection);
            this.trackerCount = 0;
            this.closed = new AtomicBoolean(false);
            this.id = ConsumersCoordinator.this.managerIdSequence.getAndIncrement();
            this.node = broker;
            this.name = ConsumersCoordinator.keyForClientSubscription(broker);
            ConsumersCoordinator.LOGGER.debug("creating subscription manager on {}", this.name);
            IntStream.range(0, ConsumersCoordinator.this.maxConsumersByConnection).forEach(i -> {
                this.subscriptionTrackers.add(null);
            });
            this.trackerCount = 0;
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            Client.ChunkListener chunkListener = (client, b, j, j2, j3) -> {
                SubscriptionTracker subscriptionTracker = this.subscriptionTrackers.get(b & 255);
                if (subscriptionTracker == null || !subscriptionTracker.consumer.isOpen()) {
                    ConsumersCoordinator.LOGGER.debug("Could not find stream subscription {} or subscription closing, not providing credits", Integer.valueOf(b & 255));
                } else {
                    client.credit(b, 1);
                }
            };
            Client.CreditNotification creditNotification = (b2, s) -> {
                ConsumersCoordinator.LOGGER.debug("Received credit notification for subscription {}: {}", Integer.valueOf(b2 & 255), Utils.formatConstant(s));
            };
            Client.MessageListener messageListener = (b3, j4, j5, j6, message) -> {
                SubscriptionTracker subscriptionTracker = this.subscriptionTrackers.get(b3 & 255);
                if (subscriptionTracker == null) {
                    ConsumersCoordinator.LOGGER.debug("Could not find stream subscription {} in manager {}, node {}", new Object[]{Byte.valueOf(b3), Long.valueOf(this.id), this.name});
                    return;
                }
                SubscriptionTracker.access$2002(subscriptionTracker, j4);
                subscriptionTracker.hasReceivedSomething = true;
                subscriptionTracker.messageHandler.handle(new MessageHandlerContext(j4, j5, j6, subscriptionTracker.consumer), message);
            };
            Client.ShutdownListener shutdownListener = shutdownContext -> {
                this.closed.set(true);
                ConsumersCoordinator.this.managers.remove(this);
                if (shutdownContext.isShutdownUnexpected()) {
                    ConsumersCoordinator.LOGGER.debug("Unexpected shutdown notification on subscription connection {}, scheduling consumers re-assignment", this.name);
                    ConsumersCoordinator.LOGGER.debug("Subscription connection has {} consumer(s) over {} stream(s) to recover", Long.valueOf(this.subscriptionTrackers.stream().filter((v0) -> {
                        return Objects.nonNull(v0);
                    }).count()), Integer.valueOf(this.streamToStreamSubscriptions.size()));
                    ConsumersCoordinator.this.environment.scheduledExecutorService().execute(Utils.namedRunnable(() -> {
                        if (Thread.currentThread().isInterrupted()) {
                            return;
                        }
                        this.subscriptionTrackers.stream().filter((v0) -> {
                            return Objects.nonNull(v0);
                        }).filter(subscriptionTracker -> {
                            return subscriptionTracker.state() == SubscriptionState.ACTIVE;
                        }).forEach((v0) -> {
                            v0.detachFromManager();
                        });
                        for (Map.Entry<String, Set<SubscriptionTracker>> entry : this.streamToStreamSubscriptions.entrySet()) {
                            if (Thread.currentThread().isInterrupted()) {
                                ConsumersCoordinator.LOGGER.debug("Interrupting consumer re-assignment task");
                                return;
                            }
                            String key = entry.getKey();
                            Set<SubscriptionTracker> value = entry.getValue();
                            if (value == null || value.isEmpty()) {
                                ConsumersCoordinator.LOGGER.debug("No consumer to re-assign to stream {} after disconnection", key);
                            } else {
                                ConsumersCoordinator.LOGGER.debug("Re-assigning {} consumer(s) to stream {} after disconnection", Integer.valueOf(value.size()), key);
                                assignConsumersToStream(value, key, ConsumersCoordinator.this.recoveryBackOffDelayPolicy(), false);
                            }
                        }
                    }, "Consumers re-assignment after disconnection from %s", this.name));
                }
            };
            Client.MetadataListener metadataListener = (str, s2) -> {
                Set<SubscriptionTracker> remove;
                ConsumersCoordinator.LOGGER.debug("Received metadata notification for '{}', stream is likely to have become unavailable", str);
                synchronized (this) {
                    remove = this.streamToStreamSubscriptions.remove(str);
                    if (remove != null && !remove.isEmpty()) {
                        ArrayList arrayList = new ArrayList(ConsumersCoordinator.this.maxConsumersByConnection);
                        for (int i2 = 0; i2 < ConsumersCoordinator.this.maxConsumersByConnection; i2++) {
                            arrayList.add(this.subscriptionTrackers.get(i2));
                        }
                        for (SubscriptionTracker subscriptionTracker : remove) {
                            ConsumersCoordinator.LOGGER.debug("Subscription {} was at offset {} (received something? {})", new Object[]{Byte.valueOf(subscriptionTracker.subscriptionIdInClient), Long.valueOf(subscriptionTracker.offset), Boolean.valueOf(subscriptionTracker.hasReceivedSomething)});
                            arrayList.set(subscriptionTracker.subscriptionIdInClient & 255, null);
                            subscriptionTracker.consumer.setSubscriptionClient(null);
                        }
                        setSubscriptionTrackers(arrayList);
                    }
                }
                if (remove == null || remove.isEmpty()) {
                    return;
                }
                ConsumersCoordinator.this.environment.scheduledExecutorService().execute(Utils.namedRunnable(() -> {
                    if (Thread.currentThread().isInterrupted()) {
                        return;
                    }
                    ConsumersCoordinator.LOGGER.debug("Trying to move {} subscription(s) (stream '{}')", Integer.valueOf(remove.size()), str);
                    assignConsumersToStream(remove, str, ConsumersCoordinator.this.metadataUpdateBackOffDelayPolicy(), true);
                }, "Consumers re-assignment after metadata update on stream '%s'", str));
            };
            Client.ConsumerUpdateListener consumerUpdateListener = (client2, b4, z) -> {
                OffsetSpecification offsetSpecification = null;
                SubscriptionTracker subscriptionTracker = this.subscriptionTrackers.get(b4 & 255);
                if (subscriptionTracker == null) {
                    ConsumersCoordinator.LOGGER.debug("Could not find stream subscription {} for consumer update", Byte.valueOf(b4));
                } else if (Utils.isSac(subscriptionTracker.subscriptionProperties)) {
                    offsetSpecification = subscriptionTracker.consumer.consumerUpdate(z);
                } else {
                    ConsumersCoordinator.LOGGER.debug("Subscription {} is not a single active consumer, nothing to do.", Byte.valueOf(b4));
                }
                return offsetSpecification;
            };
            String str2 = (String) ConsumersCoordinator.this.connectionNamingStrategy.apply(Utils.ClientConnectionType.CONSUMER);
            this.client = ConsumersCoordinator.this.clientFactory.client(Utils.ClientFactoryContext.fromParameters(clientParameters.clientProperty("connection_name", str2).chunkListener(chunkListener).creditNotification(creditNotification).messageListener(messageListener).shutdownListener(shutdownListener).metadataListener(metadataListener).consumerUpdateListener(consumerUpdateListener)).key(this.name));
            ConsumersCoordinator.LOGGER.debug("Created consumer connection '{}'", str2);
            ConsumersCoordinator.maybeExchangeCommandVersions(this.client);
            atomicBoolean.set(true);
        }

        private void assignConsumersToStream(Collection<SubscriptionTracker> collection, String str, BackOffDelayPolicy backOffDelayPolicy, boolean z) {
            Runnable runnable = () -> {
                Iterator it = collection.iterator();
                while (it.hasNext()) {
                    try {
                        ((SubscriptionTracker) it.next()).consumer.closeAfterStreamDeletion();
                    } catch (Exception e) {
                        ConsumersCoordinator.LOGGER.debug("Error while closing consumer: {}", e.getMessage());
                    }
                }
            };
            AsyncRetry.asyncRetry(() -> {
                return ConsumersCoordinator.this.findBrokersForStream(str);
            }).description("Candidate lookup to consume from '%s'", str).scheduler(ConsumersCoordinator.this.environment.scheduledExecutorService()).retry(exc -> {
                return !(exc instanceof StreamDoesNotExistException);
            }).delayPolicy(backOffDelayPolicy).build().thenAccept(list -> {
                if (list == null) {
                    ConsumersCoordinator.LOGGER.debug("No candidate nodes to consume from '{}'", str);
                    runnable.run();
                    return;
                }
                Iterator it = collection.iterator();
                while (it.hasNext()) {
                    maybeRecoverSubscription(list, (SubscriptionTracker) it.next());
                }
                if (z) {
                    closeIfEmpty();
                }
            }).exceptionally(th -> {
                ConsumersCoordinator.LOGGER.debug("Error while trying to assign {} consumer(s) to {}", new Object[]{Integer.valueOf(collection.size()), str, th});
                runnable.run();
                if (!z) {
                    return null;
                }
                closeIfEmpty();
                return null;
            });
        }

        private void maybeRecoverSubscription(List<Client.Broker> list, SubscriptionTracker subscriptionTracker) {
            if (!subscriptionTracker.compareAndSet(SubscriptionState.ACTIVE, SubscriptionState.RECOVERING)) {
                ConsumersCoordinator.LOGGER.debug("Not recovering consumer {} from stream {}, state is {}, expected is {}", new Object[]{Long.valueOf(subscriptionTracker.consumer.id()), subscriptionTracker.stream, subscriptionTracker.state(), SubscriptionState.ACTIVE});
                return;
            }
            try {
                recoverSubscription(list, subscriptionTracker);
            } catch (Exception e) {
                ConsumersCoordinator.LOGGER.warn("Error while recovering consumer {} from stream '{}'. Reason: {}", new Object[]{Long.valueOf(subscriptionTracker.consumer.id()), subscriptionTracker.stream, Utils.exceptionMessage(e)});
            }
        }

        private void recoverSubscription(List<Client.Broker> list, SubscriptionTracker subscriptionTracker) {
            boolean z = false;
            while (!z) {
                try {
                    if (subscriptionTracker.consumer.isOpen()) {
                        Client.Broker pickBroker = ConsumersCoordinator.this.pickBroker(list);
                        ConsumersCoordinator.LOGGER.debug("Using {} to resume consuming from {}", pickBroker, subscriptionTracker.stream);
                        synchronized (subscriptionTracker.consumer) {
                            if (subscriptionTracker.consumer.isOpen()) {
                                ConsumersCoordinator.this.addToManager(pickBroker, subscriptionTracker, subscriptionTracker.hasReceivedSomething ? OffsetSpecification.offset(subscriptionTracker.offset) : subscriptionTracker.initialOffsetSpecification, false);
                            }
                        }
                    } else {
                        ConsumersCoordinator.LOGGER.debug("Not re-assigning consumer {} (stream '{}') because it has been closed", Long.valueOf(subscriptionTracker.consumer.id()), subscriptionTracker.stream);
                    }
                    z = true;
                } catch (StreamNotAvailableException | ConnectionStreamException | ClientClosedException e) {
                    ConsumersCoordinator.LOGGER.debug("Consumer {} re-assignment on stream {} timed out or connection closed or stream not available, refreshing candidates and retrying", Long.valueOf(subscriptionTracker.consumer.id()), subscriptionTracker.stream);
                    list = (List) Utils.callAndMaybeRetry(() -> {
                        return ConsumersCoordinator.this.findBrokersForStream(subscriptionTracker.stream);
                    }, exc -> {
                        return !(exc instanceof StreamDoesNotExistException);
                    }, ConsumersCoordinator.this.environment.recoveryBackOffDelayPolicy(), "Candidate lookup to consume from '%s'", subscriptionTracker.stream);
                } catch (Exception e2) {
                    ConsumersCoordinator.LOGGER.warn("Error while re-assigning subscription from stream {}", subscriptionTracker.stream, e2);
                    z = true;
                }
            }
        }

        private void checkNotClosed() {
            if (!this.client.isOpen()) {
                throw new ClientClosedException();
            }
        }

        synchronized void add(SubscriptionTracker subscriptionTracker, OffsetSpecification offsetSpecification, boolean z) {
            if (isFull()) {
                throw new IllegalStateException("Cannot add subscription tracker, the manager is full");
            }
            if (isClosed()) {
                throw new IllegalStateException("Cannot add subscription tracker, the manager is closed");
            }
            checkNotClosed();
            byte b = 0;
            int i = 0;
            while (true) {
                if (i >= ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT) {
                    break;
                }
                if (this.subscriptionTrackers.get(i) == null) {
                    b = (byte) i;
                    break;
                }
                i++;
            }
            List<SubscriptionTracker> list = this.subscriptionTrackers;
            Logger logger = ConsumersCoordinator.LOGGER;
            Object[] objArr = new Object[4];
            objArr[0] = subscriptionTracker.stream;
            objArr[1] = offsetSpecification == null ? ConsumersCoordinator.DEFAULT_OFFSET_SPECIFICATION : offsetSpecification;
            objArr[2] = subscriptionTracker.offsetTrackingReference;
            objArr[3] = subscriptionTracker.subscriptionProperties;
            logger.debug("Subscribing to {}, requested offset specification is {}, offset tracking reference is {}, properties are {}", objArr);
            try {
                subscriptionTracker.assign(b, this);
                this.streamToStreamSubscriptions.computeIfAbsent(subscriptionTracker.stream, str -> {
                    return ConcurrentHashMap.newKeySet();
                }).add(subscriptionTracker);
                setSubscriptionTrackers(update(list, b, subscriptionTracker));
                String str2 = subscriptionTracker.offsetTrackingReference;
                if (str2 != null) {
                    checkNotClosed();
                    Client.QueryOffsetResponse queryOffsetResponse = (Client.QueryOffsetResponse) Utils.callAndMaybeRetry(() -> {
                        return this.client.queryOffset(str2, subscriptionTracker.stream);
                    }, ConsumersCoordinator.RETRY_ON_TIMEOUT, "Offset query for consumer %s on stream '%s' (reference %s)", Long.valueOf(subscriptionTracker.consumer.id()), subscriptionTracker.stream, str2);
                    if (queryOffsetResponse.isOk() && queryOffsetResponse.getOffset() != 0) {
                        if (offsetSpecification != null && z) {
                            ConsumersCoordinator.LOGGER.info("Requested offset specification {} not used in favor of stored offset found for reference {}", offsetSpecification, str2);
                        }
                        ConsumersCoordinator.LOGGER.debug("Using offset {} to start consuming from {} with consumer {} (instead of {})", new Object[]{Long.valueOf(queryOffsetResponse.getOffset()), subscriptionTracker.stream, str2, offsetSpecification});
                        offsetSpecification = OffsetSpecification.offset(queryOffsetResponse.getOffset() + 1);
                    }
                }
                OffsetSpecification offsetSpecification2 = offsetSpecification == null ? ConsumersCoordinator.DEFAULT_OFFSET_SPECIFICATION : offsetSpecification;
                DefaultSubscriptionContext defaultSubscriptionContext = new DefaultSubscriptionContext(offsetSpecification2);
                subscriptionTracker.subscriptionListener.preSubscribe(defaultSubscriptionContext);
                ConsumersCoordinator.LOGGER.info("Computed offset specification {}, offset specification used after subscription listener {}", offsetSpecification2, defaultSubscriptionContext.offsetSpecification());
                checkNotClosed();
                byte b2 = b;
                Client.Response response = (Client.Response) Utils.callAndMaybeRetry(() -> {
                    return this.client.subscribe(b2, subscriptionTracker.stream, defaultSubscriptionContext.offsetSpecification(), 10, subscriptionTracker.subscriptionProperties);
                }, ConsumersCoordinator.RETRY_ON_TIMEOUT, "Subscribe request for consumer %s on stream '%s'", Long.valueOf(subscriptionTracker.consumer.id()), subscriptionTracker.stream);
                if (response.isOk()) {
                    subscriptionTracker.state(SubscriptionState.ACTIVE);
                    ConsumersCoordinator.LOGGER.debug("Subscribed to '{}'", subscriptionTracker.stream);
                } else {
                    String str3 = "Subscription to stream " + subscriptionTracker.stream + " failed with code " + Utils.formatConstant(response.getResponseCode());
                    ConsumersCoordinator.LOGGER.debug(str3);
                    throw Utils.convertCodeToException(response.getResponseCode(), subscriptionTracker.stream, () -> {
                        return str3;
                    });
                }
            } catch (RuntimeException e) {
                subscriptionTracker.assign((byte) -1, null);
                setSubscriptionTrackers(list);
                this.streamToStreamSubscriptions.computeIfAbsent(subscriptionTracker.stream, str4 -> {
                    return ConcurrentHashMap.newKeySet();
                }).remove(subscriptionTracker);
                maybeCleanStreamToStreamSubscriptions(subscriptionTracker.stream);
                throw e;
            }
        }

        private void maybeCleanStreamToStreamSubscriptions(String str) {
            this.streamToStreamSubscriptions.compute(str, (str2, set) -> {
                if (set == null || set.isEmpty()) {
                    return null;
                }
                return set;
            });
        }

        synchronized void remove(SubscriptionTracker subscriptionTracker) {
            byte b = subscriptionTracker.subscriptionIdInClient;
            try {
                Client.Response response = (Client.Response) Utils.callAndMaybeRetry(() -> {
                    return this.client.unsubscribe(b);
                }, ConsumersCoordinator.RETRY_ON_TIMEOUT, "Unsubscribe request for consumer %d on stream '%s'", Long.valueOf(subscriptionTracker.consumer.id()), subscriptionTracker.stream);
                if (!response.isOk()) {
                    ConsumersCoordinator.LOGGER.warn("Unexpected response code when unsubscribing from {}: {} (subscription ID {})", new Object[]{subscriptionTracker.stream, Utils.formatConstant(response.getResponseCode()), Byte.valueOf(b)});
                }
            } catch (TimeoutStreamException e) {
                ConsumersCoordinator.LOGGER.debug("Reached timeout when trying to unsubscribe consumer {} from stream '{}'", Long.valueOf(subscriptionTracker.consumer.id()), subscriptionTracker.stream);
            }
            setSubscriptionTrackers(update(this.subscriptionTrackers, b, null));
            this.streamToStreamSubscriptions.compute(subscriptionTracker.stream, (str, set) -> {
                if (set == null || set.isEmpty()) {
                    return null;
                }
                set.remove(subscriptionTracker);
                if (set.isEmpty()) {
                    return null;
                }
                return set;
            });
            closeIfEmpty();
        }

        private List<SubscriptionTracker> update(List<SubscriptionTracker> list, byte b, SubscriptionTracker subscriptionTracker) {
            ArrayList arrayList = new ArrayList(ConsumersCoordinator.this.maxConsumersByConnection);
            int i = b & 255;
            int i2 = 0;
            while (i2 < ConsumersCoordinator.this.maxConsumersByConnection) {
                arrayList.add(i2 == i ? subscriptionTracker : list.get(i2));
                i2++;
            }
            return arrayList;
        }

        private void setSubscriptionTrackers(List<SubscriptionTracker> list) {
            this.subscriptionTrackers = list;
            this.trackerCount = (int) this.subscriptionTrackers.stream().filter((v0) -> {
                return Objects.nonNull(v0);
            }).count();
        }

        boolean isFull() {
            return this.trackerCount == ConsumersCoordinator.this.maxConsumersByConnection;
        }

        boolean isEmpty() {
            return this.trackerCount == 0;
        }

        boolean isClosed() {
            if (!this.client.isOpen()) {
                close();
            }
            return this.closed.get();
        }

        public synchronized void closeIfEmpty() {
            if (isEmpty()) {
                close();
            }
        }

        synchronized void close() {
            if (this.closed.compareAndSet(false, true)) {
                ConsumersCoordinator.this.managers.remove(this);
                ConsumersCoordinator.LOGGER.debug("Closing consumer subscription manager on {}, id {}", this.name, Long.valueOf(this.id));
                if (this.client == null || !this.client.isOpen()) {
                    return;
                }
                for (int i = 0; i < this.subscriptionTrackers.size(); i++) {
                    SubscriptionTracker subscriptionTracker = this.subscriptionTrackers.get(i);
                    if (subscriptionTracker != null) {
                        try {
                            if (this.client != null && this.client.isOpen() && subscriptionTracker.consumer.isOpen()) {
                                this.client.unsubscribe(subscriptionTracker.subscriptionIdInClient);
                            }
                        } catch (Exception e) {
                            ConsumersCoordinator.LOGGER.debug("Error while unsubscribing from {}, registration {}", subscriptionTracker.stream, Byte.valueOf(subscriptionTracker.subscriptionIdInClient));
                        }
                        this.subscriptionTrackers.set(i, null);
                    }
                }
                this.streamToStreamSubscriptions.clear();
                if (this.client == null || !this.client.isOpen()) {
                    return;
                }
                this.client.close();
            }
        }

        @Override // java.lang.Comparable
        public int compareTo(ClientSubscriptionsManager clientSubscriptionsManager) {
            return Long.compare(this.id, clientSubscriptionsManager.id);
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            return obj != null && getClass() == obj.getClass() && this.id == ((ClientSubscriptionsManager) obj).id;
        }

        public int hashCode() {
            return Objects.hash(Long.valueOf(this.id));
        }

        /* synthetic */ ClientSubscriptionsManager(ConsumersCoordinator consumersCoordinator, Client.Broker broker, Client.ClientParameters clientParameters, AnonymousClass1 anonymousClass1) {
            this(broker, clientParameters);
        }
    }

    /* loaded from: input_file:com/rabbitmq/stream/impl/ConsumersCoordinator$DefaultSubscriptionContext.class */
    public static final class DefaultSubscriptionContext implements SubscriptionListener.SubscriptionContext {
        private volatile OffsetSpecification offsetSpecification;

        private DefaultSubscriptionContext(OffsetSpecification offsetSpecification) {
            this.offsetSpecification = offsetSpecification;
        }

        @Override // com.rabbitmq.stream.SubscriptionListener.SubscriptionContext
        public OffsetSpecification offsetSpecification() {
            return this.offsetSpecification;
        }

        @Override // com.rabbitmq.stream.SubscriptionListener.SubscriptionContext
        public void offsetSpecification(OffsetSpecification offsetSpecification) {
            this.offsetSpecification = offsetSpecification;
        }

        public String toString() {
            return "SubscriptionContext{offsetSpecification=" + this.offsetSpecification + '}';
        }

        /* synthetic */ DefaultSubscriptionContext(OffsetSpecification offsetSpecification, AnonymousClass1 anonymousClass1) {
            this(offsetSpecification);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/rabbitmq/stream/impl/ConsumersCoordinator$MessageHandlerContext.class */
    public static final class MessageHandlerContext implements MessageHandler.Context {
        private final long offset;
        private final long timestamp;
        private final long committedOffset;
        private final StreamConsumer consumer;

        private MessageHandlerContext(long j, long j2, long j3, StreamConsumer streamConsumer) {
            this.offset = j;
            this.timestamp = j2;
            this.committedOffset = j3;
            this.consumer = streamConsumer;
        }

        @Override // com.rabbitmq.stream.MessageHandler.Context
        public long offset() {
            return this.offset;
        }

        @Override // com.rabbitmq.stream.MessageHandler.Context
        public void storeOffset() {
            this.consumer.store(this.offset);
        }

        @Override // com.rabbitmq.stream.MessageHandler.Context
        public long timestamp() {
            return this.timestamp;
        }

        @Override // com.rabbitmq.stream.MessageHandler.Context
        public long committedChunkId() {
            return this.committedOffset;
        }

        @Override // com.rabbitmq.stream.MessageHandler.Context
        public String stream() {
            return this.consumer.stream();
        }

        @Override // com.rabbitmq.stream.MessageHandler.Context
        public Consumer consumer() {
            return this.consumer;
        }

        /* synthetic */ MessageHandlerContext(long j, long j2, long j3, StreamConsumer streamConsumer, AnonymousClass1 anonymousClass1) {
            this(j, j2, j3, streamConsumer);
        }
    }

    /* loaded from: input_file:com/rabbitmq/stream/impl/ConsumersCoordinator$SubscriptionState.class */
    public enum SubscriptionState {
        OPENING,
        ACTIVE,
        RECOVERING,
        CLOSED
    }

    /* loaded from: input_file:com/rabbitmq/stream/impl/ConsumersCoordinator$SubscriptionTracker.class */
    public static class SubscriptionTracker {
        private final long id;
        private final String stream;
        private final OffsetSpecification initialOffsetSpecification;
        private final String offsetTrackingReference;
        private final MessageHandler messageHandler;
        private final StreamConsumer consumer;
        private final SubscriptionListener subscriptionListener;
        private final Runnable trackingClosingCallback;
        private final Map<String, String> subscriptionProperties;
        private volatile long offset;
        private volatile boolean hasReceivedSomething;
        private volatile byte subscriptionIdInClient;
        private volatile ClientSubscriptionsManager manager;
        private volatile AtomicReference<SubscriptionState> state;

        private SubscriptionTracker(long j, StreamConsumer streamConsumer, String str, OffsetSpecification offsetSpecification, String str2, SubscriptionListener subscriptionListener, Runnable runnable, MessageHandler messageHandler, Map<String, String> map) {
            this.hasReceivedSomething = false;
            this.state = new AtomicReference<>(SubscriptionState.OPENING);
            this.id = j;
            this.consumer = streamConsumer;
            this.stream = str;
            this.initialOffsetSpecification = offsetSpecification;
            this.offsetTrackingReference = str2;
            this.subscriptionListener = subscriptionListener;
            this.trackingClosingCallback = runnable;
            this.messageHandler = messageHandler;
            if (this.offsetTrackingReference == null) {
                this.subscriptionProperties = map;
                return;
            }
            ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap(map.size() + 1);
            concurrentHashMap.putAll(map);
            concurrentHashMap.put("name", this.offsetTrackingReference);
            this.subscriptionProperties = Collections.unmodifiableMap(concurrentHashMap);
        }

        synchronized void cancel() {
            ConsumersCoordinator.LOGGER.debug("Calling tracking consumer closing callback (may be no-op)");
            this.trackingClosingCallback.run();
            if (this.manager != null) {
                ConsumersCoordinator.LOGGER.debug("Removing consumer from manager " + this.consumer);
                this.manager.remove(this);
            } else {
                ConsumersCoordinator.LOGGER.debug("No manager to remove consumer from");
            }
            state(SubscriptionState.CLOSED);
        }

        synchronized void assign(byte b, ClientSubscriptionsManager clientSubscriptionsManager) {
            this.subscriptionIdInClient = b;
            this.manager = clientSubscriptionsManager;
            if (this.manager != null) {
                this.consumer.setSubscriptionClient(this.manager.client);
            } else if (this.consumer != null) {
                this.consumer.setSubscriptionClient(null);
            }
        }

        synchronized void detachFromManager() {
            this.manager = null;
            this.consumer.setSubscriptionClient(null);
        }

        void state(SubscriptionState subscriptionState) {
            this.state.set(subscriptionState);
        }

        boolean compareAndSet(SubscriptionState subscriptionState, SubscriptionState subscriptionState2) {
            return this.state.compareAndSet(subscriptionState, subscriptionState2);
        }

        SubscriptionState state() {
            return this.state.get();
        }

        /* synthetic */ SubscriptionTracker(long j, StreamConsumer streamConsumer, String str, OffsetSpecification offsetSpecification, String str2, SubscriptionListener subscriptionListener, Runnable runnable, MessageHandler messageHandler, Map map, AnonymousClass1 anonymousClass1) {
            this(j, streamConsumer, str, offsetSpecification, str2, subscriptionListener, runnable, messageHandler, map);
        }

        /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: com.rabbitmq.stream.impl.ConsumersCoordinator.SubscriptionTracker.access$2002(com.rabbitmq.stream.impl.ConsumersCoordinator$SubscriptionTracker, long):long
            java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
            	at java.base/java.lang.System.arraycopy(Native Method)
            	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
            	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
            	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
            	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
            	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
            	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
            	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
            	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
            	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
            	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:449)
            	at jadx.core.ProcessClass.process(ProcessClass.java:70)
            	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
            	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
            	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
            	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
            */
        static /* synthetic */ long access$2002(com.rabbitmq.stream.impl.ConsumersCoordinator.SubscriptionTracker r6, long r7) {
            /*
                r0 = r6
                r1 = r7
                // decode failed: arraycopy: source index -1 out of bounds for object array[6]
                r0.offset = r1
                return r-1
            */
            throw new UnsupportedOperationException("Method not decompiled: com.rabbitmq.stream.impl.ConsumersCoordinator.SubscriptionTracker.access$2002(com.rabbitmq.stream.impl.ConsumersCoordinator$SubscriptionTracker, long):long");
        }
    }

    public ConsumersCoordinator(StreamEnvironment streamEnvironment, int i, Function<Utils.ClientConnectionType, String> function, Utils.ClientFactory clientFactory) {
        this.environment = streamEnvironment;
        this.clientFactory = clientFactory;
        this.maxConsumersByConnection = i;
        this.connectionNamingStrategy = function;
    }

    public static String keyForClientSubscription(Client.Broker broker) {
        return broker.getHost() + ":" + broker.getPort();
    }

    public BackOffDelayPolicy recoveryBackOffDelayPolicy() {
        return this.environment.recoveryBackOffDelayPolicy();
    }

    public BackOffDelayPolicy metadataUpdateBackOffDelayPolicy() {
        return this.environment.topologyUpdateBackOffDelayPolicy();
    }

    public Runnable subscribe(StreamConsumer streamConsumer, String str, OffsetSpecification offsetSpecification, String str2, SubscriptionListener subscriptionListener, Runnable runnable, MessageHandler messageHandler, Map<String, String> map) {
        Client.Broker pickBroker = pickBroker(findBrokersForStream(str));
        if (pickBroker == null) {
            throw new IllegalStateException("No available node to subscribe to");
        }
        SubscriptionTracker subscriptionTracker = new SubscriptionTracker(this.trackerIdSequence.getAndIncrement(), streamConsumer, str, offsetSpecification, str2, subscriptionListener, runnable, messageHandler, map);
        try {
            addToManager(pickBroker, subscriptionTracker, offsetSpecification, true);
            this.trackers.add(subscriptionTracker);
            return () -> {
                try {
                    this.trackers.remove(subscriptionTracker);
                } catch (Exception e) {
                    LOGGER.debug("Error while removing subscription tracker from list");
                }
                subscriptionTracker.cancel();
            };
        } catch (ConnectionStreamException e) {
            throw new StreamException(e.getMessage());
        }
    }

    public void addToManager(Client.Broker broker, SubscriptionTracker subscriptionTracker, OffsetSpecification offsetSpecification, boolean z) {
        Client.ClientParameters port = this.environment.clientParametersCopy().host(broker.getHost()).port(broker.getPort());
        ClientSubscriptionsManager clientSubscriptionsManager = null;
        while (clientSubscriptionsManager == null) {
            Iterator<ClientSubscriptionsManager> it = this.managers.iterator();
            while (it.hasNext()) {
                clientSubscriptionsManager = it.next();
                if (!clientSubscriptionsManager.isClosed()) {
                    if (broker.equals(clientSubscriptionsManager.node) && !clientSubscriptionsManager.isFull()) {
                        break;
                    } else {
                        clientSubscriptionsManager = null;
                    }
                } else {
                    it.remove();
                    clientSubscriptionsManager = null;
                }
            }
            if (clientSubscriptionsManager == null) {
                String keyForClientSubscription = keyForClientSubscription(broker);
                LOGGER.debug("Creating subscription manager on {}", keyForClientSubscription);
                clientSubscriptionsManager = new ClientSubscriptionsManager(broker, port);
                LOGGER.debug("Created subscription manager on {}, id {}", keyForClientSubscription, Long.valueOf(clientSubscriptionsManager.id));
            }
            try {
                clientSubscriptionsManager.add(subscriptionTracker, offsetSpecification, z);
                LOGGER.debug("Assigned tracker {} (stream '{}') to manager {} (node {}), subscription ID {}", new Object[]{Long.valueOf(subscriptionTracker.id), subscriptionTracker.stream, Long.valueOf(clientSubscriptionsManager.id), clientSubscriptionsManager.name, Byte.valueOf(subscriptionTracker.subscriptionIdInClient)});
                this.managers.add(clientSubscriptionsManager);
            } catch (StreamNotAvailableException | ConnectionStreamException | ClientClosedException e) {
                if (clientSubscriptionsManager.isEmpty()) {
                    ClientSubscriptionsManager clientSubscriptionsManager2 = clientSubscriptionsManager;
                    this.environment.execute(() -> {
                        clientSubscriptionsManager2.closeIfEmpty();
                    }, "Consumer manager closing after timeout, consumer %d on stream '%s'", Long.valueOf(subscriptionTracker.consumer.id()), subscriptionTracker.stream);
                }
                throw e;
            } catch (IllegalStateException e2) {
                clientSubscriptionsManager = null;
            } catch (RuntimeException e3) {
                if (clientSubscriptionsManager != null) {
                    clientSubscriptionsManager.closeIfEmpty();
                }
                throw e3;
            }
        }
    }

    public int managerCount() {
        return this.managers.size();
    }

    List<Client.Broker> findBrokersForStream(String str) {
        List<Client.Broker> singletonList;
        Map map = (Map) this.environment.locatorOperation(Utils.namedFunction(client -> {
            return client.metadata(str);
        }, "Candidate lookup to consume from '%s'", str));
        if (map.size() == 0 || map.get(str) == null) {
            throw new StreamDoesNotExistException(str);
        }
        Client.StreamMetadata streamMetadata = (Client.StreamMetadata) map.get(str);
        if (!streamMetadata.isResponseOk()) {
            if (streamMetadata.getResponseCode() == 2) {
                throw new StreamDoesNotExistException(str);
            }
            throw new IllegalStateException("Could not get stream metadata, response code: " + Utils.formatConstant(streamMetadata.getResponseCode()));
        }
        List<Client.Broker> replicas = streamMetadata.getReplicas();
        if ((replicas == null || replicas.isEmpty()) && streamMetadata.getLeader() == null) {
            throw new IllegalStateException("No node available to consume from stream " + str);
        }
        if (replicas == null || replicas.isEmpty()) {
            singletonList = Collections.singletonList(streamMetadata.getLeader());
            LOGGER.debug("Only leader node {} for consuming from {}", streamMetadata.getLeader(), str);
        } else {
            LOGGER.debug("Replicas for consuming from {}: {}", str, replicas);
            singletonList = new ArrayList(replicas);
        }
        LOGGER.debug("Candidates to consume from {}: {}", str, singletonList);
        return singletonList;
    }

    public Client.Broker pickBroker(List<Client.Broker> list) {
        if (list.isEmpty()) {
            return null;
        }
        return list.size() == 1 ? list.get(0) : list.get(this.random.nextInt(list.size()));
    }

    public void close() {
        Iterator<ClientSubscriptionsManager> it = this.managers.iterator();
        while (it.hasNext()) {
            ClientSubscriptionsManager next = it.next();
            try {
                it.remove();
                next.close();
            } catch (Exception e) {
                LOGGER.info("Error while closing manager {} connected to node {}: {}", new Object[]{Long.valueOf(next.id), next.name, e.getMessage()});
            }
        }
    }

    public String toString() {
        StringBuilder sb = new StringBuilder("{");
        sb.append(Utils.jsonField("client_count", Integer.valueOf(this.managers.size()))).append(", ");
        sb.append(Utils.quote("clients")).append(" : [");
        sb.append((String) this.managers.stream().map(clientSubscriptionsManager -> {
            StringBuilder sb2 = new StringBuilder("{");
            sb2.append(Utils.jsonField("id", Long.valueOf(clientSubscriptionsManager.id))).append(",").append(Utils.jsonField("node", clientSubscriptionsManager.name)).append(",").append(Utils.jsonField("consumer_count", Integer.valueOf(clientSubscriptionsManager.trackerCount))).append(",");
            sb2.append("\"subscriptions\" : [");
            sb2.append((String) clientSubscriptionsManager.subscriptionTrackers.stream().filter((v0) -> {
                return Objects.nonNull(v0);
            }).map(subscriptionTracker -> {
                StringBuilder sb3 = new StringBuilder("{");
                sb3.append(Utils.jsonField("stream", subscriptionTracker.stream)).append(",");
                sb3.append(Utils.jsonField("subscription_id", Byte.valueOf(subscriptionTracker.subscriptionIdInClient)));
                return sb3.append("}").toString();
            }).collect(Collectors.joining(",")));
            sb2.append("]");
            return sb2.append("}").toString();
        }).collect(Collectors.joining(",")));
        sb.append("]");
        sb.append(",");
        sb.append("\"subscription_count\" : ").append(this.trackers.size()).append(",");
        sb.append("\"subscriptions\" : [");
        sb.append((String) this.trackers.stream().map(subscriptionTracker -> {
            StringBuilder sb2 = new StringBuilder("{");
            sb2.append(Utils.quote("stream")).append(":").append(Utils.quote(subscriptionTracker.stream)).append(",");
            sb2.append(Utils.quote("node")).append(":");
            Client client = null;
            ClientSubscriptionsManager clientSubscriptionsManager2 = subscriptionTracker.manager;
            if (clientSubscriptionsManager2 != null) {
                client = clientSubscriptionsManager2.client;
            }
            if (client == null) {
                sb2.append("null");
            } else {
                sb2.append(Utils.quote(client.getHost() + ":" + client.getPort()));
            }
            return sb2.append("}").toString();
        }).collect(Collectors.joining(",")));
        sb.append("]");
        sb.append("}");
        return sb.toString();
    }

    public static void maybeExchangeCommandVersions(Client client) {
        try {
            if (Utils.is3_11_OrMore(client.brokerVersion())) {
                client.exchangeCommandVersions();
            }
        } catch (Exception e) {
            LOGGER.info("Error while exchanging command versions: {}", e.getMessage());
        }
    }

    static {
    }
}
