package com.azure.messaging.servicebus;

import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.implementation.CreditFlowMode;
import com.azure.core.amqp.implementation.MessageFlux;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.handler.DeliveryNotOnLinkException;
import com.azure.core.util.AsyncCloseable;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.ServiceBusSessionAcquirer;
import com.azure.messaging.servicebus.implementation.DispositionStatus;
import com.azure.messaging.servicebus.implementation.MessageUtils;
import com.azure.messaging.servicebus.implementation.ServiceBusConstants;
import com.azure.messaging.servicebus.implementation.instrumentation.ReceiverKind;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusTracer;
import com.azure.messaging.servicebus.models.AbandonOptions;
import com.azure.messaging.servicebus.models.CompleteOptions;
import com.azure.messaging.servicebus.models.DeadLetterOptions;
import com.azure.messaging.servicebus.models.DeferOptions;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.message.Message;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:com/azure/messaging/servicebus/SessionsMessagePump.class */
final class SessionsMessagePump {
    private static final AtomicLong COUNTER = new AtomicLong();
    private static final ArrayList<RollingSessionReceiver> EMPTY = new ArrayList<>(0);
    private static final ArrayList<RollingSessionReceiver> TERMINATED = new ArrayList<>(0);
    private static final Duration CONNECTION_STATE_POLL_INTERVAL = Duration.ofSeconds(20);
    private final ClientLogger logger;
    private final String identifier;
    private final String fullyQualifiedNamespace;
    private final String entityPath;
    private final ServiceBusReceiverInstrumentation instrumentation;
    private final ServiceBusSessionAcquirer sessionAcquirer;
    private final Duration maxSessionLockRenew;
    private final Duration sessionIdleTimeout;
    private final int maxConcurrentSessions;
    private final int concurrencyPerSession;
    private final int prefetch;
    private final boolean enableAutoDisposition;
    private final MessageSerializer serializer;
    private final AmqpRetryPolicy retryPolicy;
    private final Consumer<ServiceBusReceivedMessageContext> processMessage;
    private final Consumer<ServiceBusErrorContext> processError;
    private final Runnable onTerminate;
    private final SessionReceiversTracker receiversTracker;
    private final Mono<ServiceBusSessionAcquirer.Session> nextSession;
    private final AtomicReference<List<RollingSessionReceiver>> rollingReceiversRef = new AtomicReference<>(EMPTY);
    private final long pumpId = COUNTER.incrementAndGet();

    /* loaded from: input_file:com/azure/messaging/servicebus/SessionsMessagePump$NextSession.class */
    private static final class NextSession implements Supplier<Mono<ServiceBusSessionAcquirer.Session>> {
        private final AtomicReference<Boolean> isTerminated = new AtomicReference<>(false);
        private final long pumpId;
        private final String fullyQualifiedNamespace;
        private final String entityPath;
        private final ServiceBusSessionAcquirer sessionAcquirer;

        NextSession(long j, String str, String str2, ServiceBusSessionAcquirer serviceBusSessionAcquirer) {
            this.pumpId = j;
            this.fullyQualifiedNamespace = str;
            this.entityPath = str2;
            this.sessionAcquirer = serviceBusSessionAcquirer;
        }

        Mono<ServiceBusSessionAcquirer.Session> mono() {
            return Mono.defer(this);
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.function.Supplier
        public Mono<ServiceBusSessionAcquirer.Session> get() {
            return this.isTerminated.get().booleanValue() ? Mono.error(new MessagePumpTerminatedException(this.pumpId, this.fullyQualifiedNamespace, this.entityPath, "session#acquire")) : this.sessionAcquirer.acquire().onErrorMap(th -> {
                this.isTerminated.set(true);
                return new MessagePumpTerminatedException(this.pumpId, this.fullyQualifiedNamespace, this.entityPath, "session#acquire", th);
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/azure/messaging/servicebus/SessionsMessagePump$RollingSessionReceiver.class */
    public static final class RollingSessionReceiver extends AtomicReference<State<ServiceBusSessionReactorReceiver>> {
        private static final String ROLLER_ID_KEY = "roller-id";
        private static final State<ServiceBusSessionReactorReceiver> INIT = State.init();
        private static final State<ServiceBusSessionReactorReceiver> TERMINATED = State.terminated();
        private final ClientLogger logger;
        private final long pumpId;
        private final int rollerId;
        private final String fullyQualifiedNamespace;
        private final String entityPath;
        private final int concurrency;
        private final Consumer<ServiceBusReceivedMessageContext> processMessage;
        private final Consumer<ServiceBusErrorContext> processError;
        private final boolean enableAutoDisposition;
        private final Duration maxSessionLockRenew;
        private final Duration sessionIdleTimeout;
        private final MessageSerializer serializer;
        private final ServiceBusReceiverInstrumentation instrumentation;
        private final ServiceBusTracer tracer;
        private final SessionReceiversTracker receiversTracker;
        private final NextSessionStream nextSessionStream;
        private final MessageFlux messageFlux;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:com/azure/messaging/servicebus/SessionsMessagePump$RollingSessionReceiver$NextSessionStream.class */
        public static final class NextSessionStream extends AtomicBoolean {
            private final long pumpId;
            private final int rollerId;
            private final String fullyQualifiedNamespace;
            private final String entityPath;
            private final Mono<ServiceBusSessionAcquirer.Session> newSession;

            NextSessionStream(long j, int i, String str, String str2, Mono<ServiceBusSessionAcquirer.Session> mono) {
                super(false);
                this.pumpId = j;
                this.rollerId = i;
                this.fullyQualifiedNamespace = str;
                this.entityPath = str2;
                this.newSession = Mono.defer(() -> {
                    return super.get() ? Mono.error(new MessagePumpTerminatedException(this.pumpId, this.fullyQualifiedNamespace, this.entityPath, "session#next-link roller_" + this.rollerId)) : mono;
                }).map(session -> {
                    if (!super.get()) {
                        return session;
                    }
                    MessageUtils.subscribe(session.getLink().closeAsync());
                    throw new MessagePumpTerminatedException(this.pumpId, this.fullyQualifiedNamespace, this.entityPath, "session#next-link roller_" + this.rollerId);
                });
            }

            Flux<ServiceBusSessionAcquirer.Session> flux() {
                return nonEagerRepeat(this.newSession);
            }

            void close() {
                super.set(true);
            }

            private static Flux<ServiceBusSessionAcquirer.Session> nonEagerRepeat(Mono<ServiceBusSessionAcquirer.Session> mono) {
                return mono.cacheInvalidateIf(session -> {
                    return session.getLink().isDisposed();
                }).repeat().filter(session2 -> {
                    return !session2.getLink().isDisposed();
                });
            }
        }

        /* loaded from: input_file:com/azure/messaging/servicebus/SessionsMessagePump$RollingSessionReceiver$RunOnWorker.class */
        private static final class RunOnWorker implements Function<Message, Publisher<Void>> {
            private final Consumer<Message> handleMessage;
            private final Scheduler workerScheduler;

            RunOnWorker(Consumer<Message> consumer, Scheduler scheduler) {
                this.handleMessage = consumer;
                this.workerScheduler = scheduler;
            }

            @Override // java.util.function.Function
            public Mono<Void> apply(Message message) {
                return Mono.fromRunnable(() -> {
                    this.handleMessage.accept(message);
                }).subscribeOn(this.workerScheduler);
            }
        }

        RollingSessionReceiver(long j, int i, ServiceBusReceiverInstrumentation serviceBusReceiverInstrumentation, String str, String str2, Mono<ServiceBusSessionAcquirer.Session> mono, Duration duration, Duration duration2, int i2, int i3, boolean z, MessageSerializer messageSerializer, AmqpRetryPolicy amqpRetryPolicy, Consumer<ServiceBusReceivedMessageContext> consumer, Consumer<ServiceBusErrorContext> consumer2, SessionReceiversTracker sessionReceiversTracker) {
            super(INIT);
            this.pumpId = j;
            HashMap hashMap = new HashMap(3);
            hashMap.put(ROLLER_ID_KEY, Integer.valueOf(i));
            hashMap.put("namespace", str);
            hashMap.put("entityPath", str2);
            this.logger = new ClientLogger(RollingSessionReceiver.class, hashMap);
            this.rollerId = i;
            this.fullyQualifiedNamespace = str;
            this.entityPath = str2;
            this.concurrency = i2;
            this.processError = consumer2;
            this.processMessage = consumer;
            this.enableAutoDisposition = z;
            this.maxSessionLockRenew = duration;
            this.sessionIdleTimeout = duration2;
            this.serializer = messageSerializer;
            this.instrumentation = serviceBusReceiverInstrumentation;
            this.tracer = serviceBusReceiverInstrumentation.getTracer();
            this.receiversTracker = sessionReceiversTracker;
            this.nextSessionStream = new NextSessionStream(j, i, str, str2, mono);
            this.messageFlux = new MessageFlux(this.nextSessionStream.flux().map(this::nextSessionReceiver), i3, CreditFlowMode.RequestDriven, amqpRetryPolicy);
        }

        Mono<Void> begin() {
            return Mono.usingWhen(Mono.fromSupplier(() -> {
                return this.concurrency > 1 ? Schedulers.newBoundedElastic(Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE, Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "rolling-session-receiver-" + this.rollerId) : Schedulers.immediate();
            }), scheduler -> {
                return this.messageFlux.flatMap(new RunOnWorker(this::handleMessage, scheduler), this.concurrency, 1).then();
            }, scheduler2 -> {
                return terminate(TerminalSignalType.COMPLETED, scheduler2);
            }, (scheduler3, th) -> {
                return terminate(TerminalSignalType.ERRORED, scheduler3);
            }, scheduler4 -> {
                return terminate(TerminalSignalType.CANCELED, scheduler4);
            });
        }

        private Mono<Void> terminate(TerminalSignalType terminalSignalType, Scheduler scheduler) {
            if (((State) super.getAndSet(TERMINATED)) == TERMINATED) {
                return Mono.empty();
            }
            this.logger.atInfo().log("Roller terminated. rollerId:" + this.rollerId + " signal:" + terminalSignalType);
            this.nextSessionStream.close();
            scheduler.dispose();
            return Mono.empty();
        }

        private ServiceBusSessionReactorReceiver nextSessionReceiver(ServiceBusSessionAcquirer.Session session) {
            State<ServiceBusSessionReactorReceiver> state = (State) super.get();
            if (state == TERMINATED) {
                MessageUtils.subscribe(session.getLink().closeAsync());
                throw new MessagePumpTerminatedException(this.pumpId, this.fullyQualifiedNamespace, this.entityPath, "session#next-receiver roller_" + this.rollerId);
            }
            ServiceBusSessionReactorReceiver serviceBusSessionReactorReceiver = new ServiceBusSessionReactorReceiver(this.logger, this.tracer, session, this.sessionIdleTimeout, this.maxSessionLockRenew);
            if (!super.compareAndSet(state, new State(serviceBusSessionReactorReceiver))) {
                MessageUtils.subscribe(serviceBusSessionReactorReceiver.closeAsync());
                throw new MessagePumpTerminatedException(this.pumpId, this.fullyQualifiedNamespace, this.entityPath, "session#next-receiver roller_" + this.rollerId);
            }
            if (state != INIT) {
                this.receiversTracker.untrack(state.receiver);
            }
            this.receiversTracker.track(serviceBusSessionReactorReceiver);
            return serviceBusSessionReactorReceiver;
        }

        private void handleMessage(Message message) {
            ServiceBusReceivedMessage serviceBusReceivedMessage = (ServiceBusReceivedMessage) this.serializer.deserialize(message, ServiceBusReceivedMessage.class);
            this.instrumentation.instrumentProcess(serviceBusReceivedMessage, ReceiverKind.PROCESSOR, serviceBusReceivedMessage2 -> {
                this.logger.atVerbose().addKeyValue(ServiceBusConstants.SESSION_ID_KEY, serviceBusReceivedMessage.getSessionId()).addKeyValue(ServiceBusConstants.MESSAGE_ID_LOGGING_KEY, serviceBusReceivedMessage.getMessageId()).log("Received message.");
                Throwable notifyMessage = notifyMessage(serviceBusReceivedMessage2);
                if (this.enableAutoDisposition) {
                    if (notifyMessage == null) {
                        complete(serviceBusReceivedMessage2);
                    } else {
                        abandon(serviceBusReceivedMessage2);
                    }
                }
                return notifyMessage;
            });
        }

        private Throwable notifyMessage(ServiceBusReceivedMessage serviceBusReceivedMessage) {
            try {
                this.processMessage.accept(new ServiceBusReceivedMessageContext(this.receiversTracker, new ServiceBusMessageContext(serviceBusReceivedMessage)));
                return null;
            } catch (Exception e) {
                notifyError(new ServiceBusException(e, ServiceBusErrorSource.USER_CALLBACK));
                return e;
            }
        }

        private void notifyError(Throwable th) {
            try {
                this.processError.accept(new ServiceBusErrorContext(th, this.fullyQualifiedNamespace, this.entityPath));
            } catch (Exception e) {
                this.logger.atVerbose().log("Ignoring error from user processError handler.", new Object[]{e});
            }
        }

        private void complete(ServiceBusReceivedMessage serviceBusReceivedMessage) {
            try {
                this.receiversTracker.complete(serviceBusReceivedMessage).block();
            } catch (Exception e) {
                this.logger.atVerbose().log("Failed to complete message", new Object[]{e});
            }
        }

        private void abandon(ServiceBusReceivedMessage serviceBusReceivedMessage) {
            try {
                this.receiversTracker.abandon(serviceBusReceivedMessage).block();
            } catch (Exception e) {
                this.logger.atVerbose().log("Failed to abandon message", new Object[]{e});
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/azure/messaging/servicebus/SessionsMessagePump$SessionReceiversTracker.class */
    public static final class SessionReceiversTracker {
        private final ClientLogger logger;
        private final String fullyQualifiedNamespace;
        private final String entityPath;
        private final ServiceBusReceiveMode receiveMode;
        private final ConcurrentHashMap<String, ServiceBusSessionReactorReceiver> receivers;
        private final ServiceBusReceiverInstrumentation instrumentation;

        private SessionReceiversTracker(ClientLogger clientLogger, int i, String str, String str2, ServiceBusReceiveMode serviceBusReceiveMode, ServiceBusReceiverInstrumentation serviceBusReceiverInstrumentation) {
            this.logger = clientLogger;
            this.fullyQualifiedNamespace = str;
            this.entityPath = str2;
            this.receiveMode = serviceBusReceiveMode;
            this.receivers = new ConcurrentHashMap<>(i);
            this.instrumentation = serviceBusReceiverInstrumentation;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void track(ServiceBusSessionReactorReceiver serviceBusSessionReactorReceiver) {
            this.receivers.put(serviceBusSessionReactorReceiver.getSessionId(), serviceBusSessionReactorReceiver);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void untrack(ServiceBusSessionReactorReceiver serviceBusSessionReactorReceiver) {
            this.receivers.remove(serviceBusSessionReactorReceiver.getSessionId(), serviceBusSessionReactorReceiver);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void clear() {
            this.receivers.clear();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public String getFullyQualifiedNamespace() {
            return this.fullyQualifiedNamespace;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public String getEntityPath() {
            return this.entityPath;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Mono<Void> abandon(ServiceBusReceivedMessage serviceBusReceivedMessage) {
            return updateDisposition(serviceBusReceivedMessage, DispositionStatus.ABANDONED, null, null, null, null);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Mono<Void> complete(ServiceBusReceivedMessage serviceBusReceivedMessage) {
            return updateDisposition(serviceBusReceivedMessage, DispositionStatus.COMPLETED, null, null, null, null);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Mono<Void> deadLetter(ServiceBusReceivedMessage serviceBusReceivedMessage) {
            return updateDisposition(serviceBusReceivedMessage, DispositionStatus.SUSPENDED, null, null, null, null);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Mono<Void> defer(ServiceBusReceivedMessage serviceBusReceivedMessage) {
            return updateDisposition(serviceBusReceivedMessage, DispositionStatus.DEFERRED, null, null, null, null);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Mono<Void> abandon(ServiceBusReceivedMessage serviceBusReceivedMessage, AbandonOptions abandonOptions) {
            Mono<Void> checkNull = checkNull(abandonOptions, abandonOptions != null ? abandonOptions.getTransactionContext() : null);
            return checkNull != null ? checkNull : updateDisposition(serviceBusReceivedMessage, DispositionStatus.ABANDONED, abandonOptions.getPropertiesToModify(), null, null, abandonOptions.getTransactionContext());
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Mono<Void> complete(ServiceBusReceivedMessage serviceBusReceivedMessage, CompleteOptions completeOptions) {
            Mono<Void> checkNull = checkNull(completeOptions, completeOptions != null ? completeOptions.getTransactionContext() : null);
            return checkNull != null ? checkNull : updateDisposition(serviceBusReceivedMessage, DispositionStatus.COMPLETED, null, null, null, completeOptions.getTransactionContext());
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Mono<Void> deadLetter(ServiceBusReceivedMessage serviceBusReceivedMessage, DeadLetterOptions deadLetterOptions) {
            Mono<Void> checkNull = checkNull(deadLetterOptions, deadLetterOptions != null ? deadLetterOptions.getTransactionContext() : null);
            return checkNull != null ? checkNull : updateDisposition(serviceBusReceivedMessage, DispositionStatus.SUSPENDED, deadLetterOptions.getPropertiesToModify(), deadLetterOptions.getDeadLetterReason(), deadLetterOptions.getDeadLetterErrorDescription(), deadLetterOptions.getTransactionContext());
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Mono<Void> defer(ServiceBusReceivedMessage serviceBusReceivedMessage, DeferOptions deferOptions) {
            Mono<Void> checkNull = checkNull(deferOptions, deferOptions != null ? deferOptions.getTransactionContext() : null);
            return checkNull != null ? checkNull : updateDisposition(serviceBusReceivedMessage, DispositionStatus.DEFERRED, deferOptions.getPropertiesToModify(), null, null, deferOptions.getTransactionContext());
        }

        private Mono<Void> updateDisposition(ServiceBusReceivedMessage serviceBusReceivedMessage, DispositionStatus dispositionStatus, Map<String, Object> map, String str, String str2, ServiceBusTransactionContext serviceBusTransactionContext) {
            if (this.receiveMode != ServiceBusReceiveMode.PEEK_LOCK) {
                return Mono.error(new UnsupportedOperationException(String.format("'%s' is not supported on a receiver opened in ReceiveMode.RECEIVE_AND_DELETE.", dispositionStatus)));
            }
            if (serviceBusReceivedMessage.isSettled()) {
                return Mono.error(new IllegalArgumentException("The message has either been deleted or already settled."));
            }
            if (serviceBusReceivedMessage.getLockToken() == null) {
                return Mono.error(new UnsupportedOperationException("This operation is not supported for peeked messages. Only messages received using receiveMessages() in PEEK_LOCK mode can be settled."));
            }
            ServiceBusSessionReactorReceiver serviceBusSessionReactorReceiver = this.receivers.get(serviceBusReceivedMessage.getSessionId());
            DeliveryState deliveryState = MessageUtils.getDeliveryState(dispositionStatus, str, str2, map, serviceBusTransactionContext);
            return this.instrumentation.instrumentSettlement(serviceBusSessionReactorReceiver != null ? serviceBusSessionReactorReceiver.updateDisposition(serviceBusReceivedMessage.getLockToken(), deliveryState) : Mono.error(DeliveryNotOnLinkException.noMatchingDelivery(serviceBusReceivedMessage.getLockToken(), deliveryState)), serviceBusReceivedMessage, serviceBusReceivedMessage.getContext(), dispositionStatus);
        }

        private Mono<Void> checkNull(Object obj, ServiceBusTransactionContext serviceBusTransactionContext) {
            if (obj == null) {
                return FluxUtil.monoError(this.logger, new NullPointerException("'options' cannot be null."));
            }
            if (serviceBusTransactionContext == null || serviceBusTransactionContext.getTransactionId() != null) {
                return null;
            }
            return FluxUtil.monoError(this.logger, new NullPointerException("'options.transactionContext.transactionId' cannot be null."));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/azure/messaging/servicebus/SessionsMessagePump$State.class */
    public static final class State<T extends AsyncCloseable> {
        final T receiver;

        State(T t) {
            this.receiver = (T) Objects.requireNonNull(t);
        }

        static <T extends AsyncCloseable> State<T> init() {
            return new State<>();
        }

        static <T extends AsyncCloseable> State<T> terminated() {
            return new State<>();
        }

        private State() {
            this.receiver = null;
        }
    }

    /* loaded from: input_file:com/azure/messaging/servicebus/SessionsMessagePump$TerminalSignalType.class */
    private enum TerminalSignalType {
        COMPLETED,
        ERRORED,
        CANCELED
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SessionsMessagePump(String str, String str2, String str3, ServiceBusReceiveMode serviceBusReceiveMode, ServiceBusReceiverInstrumentation serviceBusReceiverInstrumentation, ServiceBusSessionAcquirer serviceBusSessionAcquirer, Duration duration, Duration duration2, int i, int i2, int i3, boolean z, MessageSerializer messageSerializer, AmqpRetryPolicy amqpRetryPolicy, Consumer<ServiceBusReceivedMessageContext> consumer, Consumer<ServiceBusErrorContext> consumer2, Runnable runnable) {
        HashMap hashMap = new HashMap(3);
        hashMap.put("pumpId", Long.valueOf(this.pumpId));
        hashMap.put("namespace", str2);
        hashMap.put("entityPath", str3);
        this.logger = new ClientLogger(SessionsMessagePump.class, hashMap);
        this.identifier = str;
        this.fullyQualifiedNamespace = (String) Objects.requireNonNull(str2, "'fullyQualifiedNamespace' cannot be null.");
        this.entityPath = (String) Objects.requireNonNull(str3, "'entityPath' cannot be null.");
        Objects.requireNonNull(serviceBusReceiveMode, "'receiveMode' cannot be null.");
        this.instrumentation = (ServiceBusReceiverInstrumentation) Objects.requireNonNull(serviceBusReceiverInstrumentation, "'instrumentation' cannot be null");
        this.sessionAcquirer = (ServiceBusSessionAcquirer) Objects.requireNonNull(serviceBusSessionAcquirer, "'sessionAcquirer' cannot be null");
        this.maxSessionLockRenew = (Duration) Objects.requireNonNull(duration, "'maxSessionLockRenew' cannot be null.");
        this.sessionIdleTimeout = duration2 != null ? duration2 : amqpRetryPolicy.getRetryOptions().getTryTimeout();
        this.maxConcurrentSessions = i;
        this.concurrencyPerSession = i2;
        this.prefetch = i3;
        this.enableAutoDisposition = z;
        this.serializer = (MessageSerializer) Objects.requireNonNull(messageSerializer, "'serializer' cannot be null.");
        this.retryPolicy = (AmqpRetryPolicy) Objects.requireNonNull(amqpRetryPolicy, "'retryPolicy' cannot be null.");
        this.processMessage = (Consumer) Objects.requireNonNull(consumer, "'processMessage' cannot be null.");
        this.processError = (Consumer) Objects.requireNonNull(consumer2, "'processError' cannot be null.");
        this.onTerminate = (Runnable) Objects.requireNonNull(runnable, "'onTerminate' cannot be null.");
        this.receiversTracker = new SessionReceiversTracker(this.logger, i, str2, str3, serviceBusReceiveMode, serviceBusReceiverInstrumentation);
        this.nextSession = new NextSession(this.pumpId, str2, str3, serviceBusSessionAcquirer).mono();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getIdentifier() {
        return this.identifier;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Void> begin() {
        logCPUResourcesConcurrencyMismatch();
        return Mono.usingWhen(Mono.fromSupplier(() -> {
            throwIfTerminatedOrInitialized();
            List<RollingSessionReceiver> createRollingSessionReceivers = createRollingSessionReceivers();
            if (!this.rollingReceiversRef.compareAndSet(EMPTY, createRollingSessionReceivers)) {
                createRollingSessionReceivers.clear();
                throwIfTerminatedOrInitialized();
            }
            return createRollingSessionReceivers;
        }), list -> {
            ArrayList arrayList = new ArrayList(list.size());
            Iterator it = list.iterator();
            while (it.hasNext()) {
                arrayList.add(((RollingSessionReceiver) it.next()).begin());
            }
            return Mono.firstWithSignal(new Mono[]{pollConnectionState(), Mono.when(arrayList)});
        }, list2 -> {
            return terminate(TerminalSignalType.COMPLETED);
        }, (list3, th) -> {
            return terminate(TerminalSignalType.ERRORED);
        }, list4 -> {
            return terminate(TerminalSignalType.CANCELED);
        }).onErrorMap(th2 -> {
            return th2 instanceof MessagePumpTerminatedException ? th2 : new MessagePumpTerminatedException(this.pumpId, this.fullyQualifiedNamespace, this.entityPath, "pumping#error-map", th2);
        }).then(Mono.error(() -> {
            return MessagePumpTerminatedException.forCompletion(this.pumpId, this.fullyQualifiedNamespace, this.entityPath);
        }));
    }

    private Mono<Void> pollConnectionState() {
        return Flux.interval(CONNECTION_STATE_POLL_INTERVAL).handle((l, synchronousSink) -> {
            if (this.sessionAcquirer.isConnectionClosed()) {
                synchronousSink.error(this.logger.atInfo().log(new MessagePumpTerminatedException(this.pumpId, this.fullyQualifiedNamespace, this.entityPath, "session#connection-state-poll")));
            }
        }).then();
    }

    private Mono<Void> terminate(TerminalSignalType terminalSignalType) {
        if (this.rollingReceiversRef.getAndSet(TERMINATED) == TERMINATED) {
            return Mono.empty();
        }
        this.logger.atInfo().log("Pump terminated. signal:" + terminalSignalType);
        this.receiversTracker.clear();
        this.onTerminate.run();
        return Mono.empty();
    }

    private List<RollingSessionReceiver> createRollingSessionReceivers() {
        ArrayList arrayList = new ArrayList(this.maxConcurrentSessions);
        for (int i = 1; i <= this.maxConcurrentSessions; i++) {
            arrayList.add(new RollingSessionReceiver(this.pumpId, i, this.instrumentation, this.fullyQualifiedNamespace, this.entityPath, this.nextSession, this.maxSessionLockRenew, this.sessionIdleTimeout, this.concurrencyPerSession, this.prefetch, this.enableAutoDisposition, this.serializer, this.retryPolicy, this.processMessage, this.processError, this.receiversTracker));
        }
        return arrayList;
    }

    private void throwIfTerminatedOrInitialized() {
        List<RollingSessionReceiver> list = this.rollingReceiversRef.get();
        if (list == TERMINATED) {
            throw this.logger.atVerbose().log(new IllegalStateException("Cannot invoke begin() once terminated."));
        }
        if (list != EMPTY) {
            throw this.logger.atVerbose().log(new IllegalStateException("Cannot invoke begin() more than once."));
        }
    }

    private void logCPUResourcesConcurrencyMismatch() {
        int availableProcessors = Runtime.getRuntime().availableProcessors();
        int i = Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE;
        int i2 = this.maxConcurrentSessions * this.concurrencyPerSession;
        if (this.concurrencyPerSession > i || i2 > 30 * availableProcessors) {
            this.logger.atWarning().log(ServiceBusConstants.CORES_VS_CONCURRENCY_MESSAGE, new Object[]{Integer.valueOf(i), Integer.valueOf(availableProcessors), i2 + " (ConcurrentSessions=" + this.maxConcurrentSessions + ", ConcurrencyPerSession=" + this.concurrencyPerSession + ")"});
        }
    }
}
