package com.azure.core.amqp.implementation;

import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.util.AsyncCloseable;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.logging.LoggingEventBuilder;
import java.time.Duration;
import java.util.HashMap;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.message.Message;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.core.scheduler.Schedulers;
import reactor.util.concurrent.Queues;

/* loaded from: input_file:com/azure/core/amqp/implementation/MessageFlux.class */
public final class MessageFlux extends FluxOperator<AmqpReceiveLink, Message> {
    private static final String MESSAGE_FLUX_KEY = "messageFlux";
    private final ClientLogger logger;
    private final int prefetch;
    private final CreditFlowMode creditFlowMode;
    private final AmqpRetryPolicy retryPolicy;
    private volatile BiFunction<String, DeliveryState, Mono<Void>> updateDispositionFunc;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/azure/core/amqp/implementation/MessageFlux$MediatorHolder.class */
    public static final class MediatorHolder {
        private boolean isFrozen;
        volatile ReactorReceiverMediator mediator;
        volatile Disposable nextMediatorRequestDisposable;

        private MediatorHolder() {
        }

        boolean trySet(ReactorReceiverMediator reactorReceiverMediator) {
            synchronized (this) {
                if (this.isFrozen) {
                    return false;
                }
                this.mediator = reactorReceiverMediator;
                return true;
            }
        }

        void freeze() {
            synchronized (this) {
                if (this.isFrozen) {
                    return;
                }
                Disposable disposable = this.nextMediatorRequestDisposable;
                ReactorReceiverMediator reactorReceiverMediator = this.mediator;
                this.isFrozen = true;
                if (disposable != null) {
                    disposable.dispose();
                }
                if (reactorReceiverMediator != null) {
                    reactorReceiverMediator.cancel();
                    reactorReceiverMediator.closeAsync().subscribe();
                }
            }
        }

        String getLinkName() {
            ReactorReceiverMediator reactorReceiverMediator = this.mediator;
            if (reactorReceiverMediator != null) {
                return reactorReceiverMediator.receiver.getLinkName();
            }
            return null;
        }

        LoggingEventBuilder updateLogWithReceiverId(LoggingEventBuilder loggingEventBuilder) {
            ReactorReceiverMediator reactorReceiverMediator = this.mediator;
            return reactorReceiverMediator != null ? loggingEventBuilder.addKeyValue(ClientConstants.CONNECTION_ID_KEY, reactorReceiverMediator.receiver.getConnectionId()).addKeyValue(ClientConstants.LINK_NAME_KEY, reactorReceiverMediator.receiver.getLinkName()).addKeyValue(ClientConstants.ENTITY_PATH_KEY, reactorReceiverMediator.receiver.getEntityPath()) : loggingEventBuilder;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/azure/core/amqp/implementation/MessageFlux$ReactorReceiverMediator.class */
    public static final class ReactorReceiverMediator implements AsyncCloseable, CoreSubscriber<Message>, Subscription {
        private final RecoverableReactorReceiver parent;
        private final AmqpReceiveLink receiver;
        private final String receiverName;
        private final String receiverEntityPath;
        private final int prefetch;
        private final CreditFlowMode creditFlowMode;
        private final ClientLogger logger;
        private CreditAccountingStrategy creditAccounting;
        private volatile boolean ready;
        private volatile Subscription s;
        volatile Throwable error;
        volatile boolean done;
        volatile boolean isRetryInitiated;
        private static final Subscription CANCELLED_SUBSCRIPTION = Operators.cancelledSubscription();
        private static final AtomicReferenceFieldUpdater<ReactorReceiverMediator, Subscription> S = AtomicReferenceFieldUpdater.newUpdater(ReactorReceiverMediator.class, Subscription.class, "s");
        static final AtomicReferenceFieldUpdater<ReactorReceiverMediator, Throwable> ERROR = AtomicReferenceFieldUpdater.newUpdater(ReactorReceiverMediator.class, Throwable.class, "error");
        private final Disposable.Composite endpointStateDisposables = Disposables.composite();
        final Queue<Message> queue = (Queue) Queues.get(Integer.MAX_VALUE).get();

        ReactorReceiverMediator(RecoverableReactorReceiver recoverableReactorReceiver, AmqpReceiveLink amqpReceiveLink, int i, CreditFlowMode creditFlowMode, ClientLogger clientLogger) {
            this.parent = recoverableReactorReceiver;
            this.receiver = amqpReceiveLink;
            this.receiverName = amqpReceiveLink.getLinkName();
            this.receiverEntityPath = amqpReceiveLink.getEntityPath();
            this.prefetch = i;
            this.creditFlowMode = creditFlowMode;
            this.logger = clientLogger;
        }

        void onParentReady() {
            updateLogWithReceiverId(this.logger.atWarning()).log("Setting next mediator and waiting for activation.");
            this.receiver.receive().subscribe(this);
            this.endpointStateDisposables.add(this.receiver.getEndpointStates().ignoreElements().subscribe(amqpEndpointState -> {
            }, th -> {
                updateLogWithReceiverId(this.logger.atWarning()).log("Receiver emitted terminal error.", new Object[]{th});
                onLinkError(th);
            }, () -> {
                updateLogWithReceiverId(this.logger.atWarning()).log("Receiver emitted terminal completion.");
                onLinkComplete();
            }));
            this.endpointStateDisposables.add(this.receiver.getEndpointStates().publishOn(Schedulers.boundedElastic()).subscribe(amqpEndpointState2 -> {
                if (amqpEndpointState2 != AmqpEndpointState.ACTIVE || this.ready) {
                    return;
                }
                updateLogWithReceiverId(this.logger.atWarning()).log("The mediator is active.");
                this.ready = true;
                this.parent.onMediatorReady(this::updateDisposition);
            }));
        }

        public void onSubscribe(Subscription subscription) {
            if (Operators.setOnce(S, this, subscription)) {
                switch (this.creditFlowMode) {
                    case RequestDriven:
                        this.creditAccounting = new RequestDrivenCreditAccountingStrategy(this.receiver, subscription, this.prefetch, this.logger);
                        return;
                    case EmissionDriven:
                        this.creditAccounting = new EmissionDrivenCreditAccountingStrategy(this.receiver, subscription, this.prefetch, this.logger);
                        return;
                    default:
                        throw new IllegalArgumentException("Unknown CreditFlowMode " + this.creditFlowMode);
                }
            }
        }

        void update(long j, long j2) {
            if (!this.ready || this.done) {
                return;
            }
            this.creditAccounting.update(j, j2);
        }

        public void onNext(Message message) {
            if (this.done) {
                Operators.onNextDropped(message, this.parent.currentContext());
                return;
            }
            if (this.s == Operators.cancelledSubscription()) {
                Operators.onDiscard(message, this.parent.currentContext());
                return;
            }
            if (this.queue.offer(message)) {
                this.parent.drain(message);
                return;
            }
            Operators.onOperatorError(this, Exceptions.failWithOverflow("Queue is full: Reactive Streams source doesn't respect backpressure"), this.parent.messageSubscriber.currentContext());
            Operators.onDiscard(message, this.parent.messageSubscriber.currentContext());
            this.done = true;
            this.parent.drain(message);
        }

        public void onError(Throwable th) {
        }

        private void onLinkError(Throwable th) {
            if (this.done) {
                Operators.onErrorDropped(th, this.parent.messageSubscriber.currentContext());
            } else if (ERROR.compareAndSet(this, null, th)) {
                this.done = true;
                this.parent.drain(null);
            } else {
                this.done = true;
                Operators.onErrorDropped(th, this.parent.messageSubscriber.currentContext());
            }
        }

        public void onComplete() {
        }

        private void onLinkComplete() {
            if (this.done) {
                return;
            }
            this.done = true;
            this.parent.drain(null);
        }

        public void request(long j) {
            throw new IllegalStateException("The request accounting must be through update(,).");
        }

        public void cancel() {
            Operators.terminate(S, this);
            Operators.onDiscardQueueWithClear(this.queue, this.parent.currentContext(), (Function) null);
        }

        public Mono<Void> closeAsync() {
            this.endpointStateDisposables.dispose();
            return this.receiver.closeAsync();
        }

        private Mono<Void> updateDisposition(String str, DeliveryState deliveryState) {
            if (!this.done && this.s != CANCELLED_SUBSCRIPTION) {
                return this.receiver.updateDisposition(str, deliveryState);
            }
            Object[] objArr = new Object[6];
            objArr[0] = deliveryState;
            objArr[1] = str;
            objArr[2] = Boolean.valueOf(this.done);
            objArr[3] = Boolean.valueOf(this.s == CANCELLED_SUBSCRIPTION);
            objArr[4] = Boolean.valueOf(this.parent.done);
            objArr[5] = Boolean.valueOf(this.parent.cancelled);
            String format = String.format("The disposition request to set the state as %s for the message with id %s cannot be processed as the link that delivered the message is disconnected. Any new link to continue the receive operation can disposition only the message that arrives on that link [State- link.done:%b link.cancelled:%b parent.done:%b parent.cancelled:%b]", objArr);
            Throwable th = this.parent.error;
            if (th == null) {
                th = this.error;
            }
            return FluxUtil.monoError(this.logger.atError().addKeyValue(ClientConstants.DELIVERY_TAG_KEY, str).addKeyValue(ClientConstants.DELIVERY_STATE_KEY, deliveryState), new IllegalStateException(format, th));
        }

        private LoggingEventBuilder updateLogWithReceiverId(LoggingEventBuilder loggingEventBuilder) {
            return loggingEventBuilder.addKeyValue(ClientConstants.CONNECTION_ID_KEY, this.receiver.getConnectionId()).addKeyValue(ClientConstants.LINK_NAME_KEY, this.receiver.getLinkName()).addKeyValue(ClientConstants.ENTITY_PATH_KEY, this.receiver.getEntityPath());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/azure/core/amqp/implementation/MessageFlux$RecoverableReactorReceiver.class */
    public static final class RecoverableReactorReceiver implements CoreSubscriber<AmqpReceiveLink>, Subscription {
        private final MessageFlux parent;
        private final int prefetch;
        private final CreditFlowMode creditFlowMode;
        private final AmqpRetryPolicy retryPolicy;
        private final ClientLogger logger;
        private final CoreSubscriber<? super Message> messageSubscriber;
        private Subscription upstream;
        private volatile long requested;
        private volatile int wip;
        private volatile boolean done;
        private volatile boolean cancelled;
        volatile Throwable error;
        private static final AtomicLongFieldUpdater<RecoverableReactorReceiver> REQUESTED = AtomicLongFieldUpdater.newUpdater(RecoverableReactorReceiver.class, "requested");
        private static final AtomicIntegerFieldUpdater<RecoverableReactorReceiver> WIP = AtomicIntegerFieldUpdater.newUpdater(RecoverableReactorReceiver.class, "wip");
        static final AtomicReferenceFieldUpdater<RecoverableReactorReceiver, Throwable> ERROR = AtomicReferenceFieldUpdater.newUpdater(RecoverableReactorReceiver.class, Throwable.class, "error");
        private final MediatorHolder mediatorHolder = new MediatorHolder();
        private final AtomicInteger retryAttempts = new AtomicInteger();

        RecoverableReactorReceiver(MessageFlux messageFlux, CoreSubscriber<? super Message> coreSubscriber, int i, CreditFlowMode creditFlowMode, AmqpRetryPolicy amqpRetryPolicy) {
            this.parent = messageFlux;
            this.messageSubscriber = coreSubscriber;
            this.prefetch = i;
            this.creditFlowMode = creditFlowMode;
            this.retryPolicy = amqpRetryPolicy;
            this.logger = messageFlux.logger;
        }

        public void onSubscribe(Subscription subscription) {
            if (Operators.validate(this.upstream, subscription)) {
                this.upstream = subscription;
                this.messageSubscriber.onSubscribe(this);
                subscription.request(1L);
            }
        }

        public void onNext(AmqpReceiveLink amqpReceiveLink) {
            if (this.done) {
                amqpReceiveLink.closeAsync().subscribe();
                Operators.onNextDropped(amqpReceiveLink, this.messageSubscriber.currentContext());
                return;
            }
            ReactorReceiverMediator reactorReceiverMediator = new ReactorReceiverMediator(this, amqpReceiveLink, this.prefetch, this.creditFlowMode, this.logger);
            if (this.mediatorHolder.trySet(reactorReceiverMediator)) {
                reactorReceiverMediator.onParentReady();
                return;
            }
            this.logger.atWarning().addKeyValue("oldLinkName", this.mediatorHolder.getLinkName()).addKeyValue(ClientConstants.LINK_NAME_KEY, amqpReceiveLink.getLinkName()).addKeyValue(ClientConstants.ENTITY_PATH_KEY, amqpReceiveLink.getEntityPath()).log("Got a AmqpReceiveLink when the MessageFlux is already terminated.");
            amqpReceiveLink.closeAsync().subscribe();
            Operators.onDiscard(amqpReceiveLink, this.messageSubscriber.currentContext());
        }

        public void onError(Throwable th) {
            if (this.done) {
                Operators.onErrorDropped(th, this.messageSubscriber.currentContext());
            } else {
                if (!Exceptions.addThrowable(ERROR, this, th)) {
                    Operators.onErrorDropped(th, this.messageSubscriber.currentContext());
                    return;
                }
                this.done = true;
                this.mediatorHolder.updateLogWithReceiverId(this.logger.atWarning()).log("Terminal error signal from upstream Or from retry loop (non_retriable or retry_exhausted) arrived at MessageFlux.", new Object[]{th});
                drain(null);
            }
        }

        public void onComplete() {
            if (this.done) {
                return;
            }
            this.done = true;
            this.mediatorHolder.updateLogWithReceiverId(this.logger.atWarning()).log("Terminal completion signal from upstream arrived at MessageFlux.");
            drain(null);
        }

        public void request(long j) {
            if (Operators.validate(j)) {
                Operators.addCap(REQUESTED, this, j);
                drain(null);
            }
        }

        public void cancel() {
            if (this.cancelled) {
                return;
            }
            this.cancelled = true;
            this.mediatorHolder.updateLogWithReceiverId(this.logger.atWarning()).log("Downstream cancellation signal arrived at MessageFlux.");
            if (WIP.getAndIncrement(this) == 0) {
                this.upstream.cancel();
                this.mediatorHolder.freeze();
            }
        }

        void onMediatorReady(BiFunction<String, DeliveryState, Mono<Void>> biFunction) {
            this.retryAttempts.set(0);
            this.parent.onNextUpdateDispositionFunction(biFunction);
            drain(null);
        }

        void drain(Message message) {
            if (WIP.getAndIncrement(this) == 0) {
                drainLoop();
            } else {
                if (message == null || !this.cancelled) {
                    return;
                }
                Operators.onDiscard(message, this.messageSubscriber.currentContext());
            }
        }

        /* JADX WARN: Code restructure failed: missing block: B:30:0x00c3, code lost:
        
            if (r14 != r12) goto L46;
         */
        /* JADX WARN: Code restructure failed: missing block: B:32:0x00d0, code lost:
        
            if (r0.queue.isEmpty() == false) goto L46;
         */
        /* JADX WARN: Code restructure failed: missing block: B:34:0x00d8, code lost:
        
            if (r0.done == false) goto L46;
         */
        /* JADX WARN: Code restructure failed: missing block: B:35:0x00db, code lost:
        
            r16 = true;
         */
        /* JADX WARN: Code restructure failed: missing block: B:37:0x00e2, code lost:
        
            if (r14 == 0) goto L51;
         */
        /* JADX WARN: Code restructure failed: missing block: B:39:0x00eb, code lost:
        
            if (r12 == Long.MAX_VALUE) goto L51;
         */
        /* JADX WARN: Code restructure failed: missing block: B:40:0x00ee, code lost:
        
            r12 = com.azure.core.amqp.implementation.MessageFlux.RecoverableReactorReceiver.REQUESTED.addAndGet(r6, -r14);
         */
        /* JADX WARN: Code restructure failed: missing block: B:41:0x00fa, code lost:
        
            r0.update(r12, r14);
         */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        private void drainLoop() {
            /*
                Method dump skipped, instructions count: 382
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: com.azure.core.amqp.implementation.MessageFlux.RecoverableReactorReceiver.drainLoop():void");
        }

        private boolean terminateIfCancelled(CoreSubscriber<? super Message> coreSubscriber, Message message) {
            if (!this.cancelled) {
                return false;
            }
            Operators.onDiscard(message, coreSubscriber.currentContext());
            this.upstream.cancel();
            this.mediatorHolder.freeze();
            return true;
        }

        private boolean terminateIfErroredOrUpstreamCompleted(boolean z, CoreSubscriber<? super Message> coreSubscriber, Message message) {
            if (!z) {
                return false;
            }
            LoggingEventBuilder updateLogWithReceiverId = this.mediatorHolder.updateLogWithReceiverId(this.logger.atWarning());
            Throwable th = this.error;
            if (th == null || th == Exceptions.TERMINATED) {
                Operators.onDiscard(message, coreSubscriber.currentContext());
                this.upstream.cancel();
                this.mediatorHolder.freeze();
                updateLogWithReceiverId.log("MessageFlux reached a terminal completion-state, signaling it downstream.");
                coreSubscriber.onComplete();
                return true;
            }
            Throwable terminate = Exceptions.terminate(ERROR, this);
            Operators.onDiscard(message, coreSubscriber.currentContext());
            this.upstream.cancel();
            this.mediatorHolder.freeze();
            updateLogWithReceiverId.log("MessageFlux reached a terminal error-state, signaling it downstream.", new Object[]{terminate});
            coreSubscriber.onError(terminate);
            return true;
        }

        private void setTerminationSignalOrScheduleNextMediatorRequest(Throwable th, CoreSubscriber<? super Message> coreSubscriber, MediatorHolder mediatorHolder) {
            Duration calculateRetryDelay;
            LoggingEventBuilder updateLogWithReceiverId = mediatorHolder.updateLogWithReceiverId(this.logger.atWarning());
            if (this.cancelled || this.done) {
                updateLogWithReceiverId.log("MessageFlux reached terminal-state [done:{}, cancelled:{}].", new Object[]{Boolean.valueOf(this.done), Boolean.valueOf(this.cancelled)});
                return;
            }
            if (th == null) {
                calculateRetryDelay = Duration.ofSeconds(1L);
                updateLogWithReceiverId.addKeyValue("retryAfter", calculateRetryDelay.toMillis()).log("Current mediator reached terminal completion-state (retriable:true).");
            } else {
                int incrementAndGet = this.retryAttempts.incrementAndGet();
                calculateRetryDelay = this.retryPolicy.calculateRetryDelay(th, incrementAndGet);
                if (calculateRetryDelay == null) {
                    updateLogWithReceiverId.addKeyValue("attempt", incrementAndGet).log("Current mediator reached terminal error-state (retriable:false) Or MessageFlux retries exhausted.", new Object[]{th});
                    onError(th);
                    return;
                }
                updateLogWithReceiverId.addKeyValue("attempt", incrementAndGet).addKeyValue("retryAfter", calculateRetryDelay.toMillis()).log("Current mediator reached terminal error-state (retriable:true).", new Object[]{th});
            }
            try {
                scheduleNextMediatorRequest(calculateRetryDelay, mediatorHolder);
            } catch (RejectedExecutionException e) {
                RuntimeException onRejectedExecution = Operators.onRejectedExecution(e, coreSubscriber.currentContext());
                mediatorHolder.updateLogWithReceiverId(this.logger.atWarning()).log("Unable to schedule a request for a new mediator (retriable:false).", new Object[]{onRejectedExecution});
                onError(onRejectedExecution);
            }
        }

        private void scheduleNextMediatorRequest(Duration duration, MediatorHolder mediatorHolder) {
            mediatorHolder.nextMediatorRequestDisposable = Schedulers.parallel().schedule(() -> {
                LoggingEventBuilder updateLogWithReceiverId = mediatorHolder.updateLogWithReceiverId(this.logger.atWarning());
                if (this.cancelled || this.done) {
                    updateLogWithReceiverId.log("During the backoff, MessageFlux reached terminal-state [done:{}, cancelled:{}].", new Object[]{Boolean.valueOf(this.done), Boolean.valueOf(this.cancelled)});
                } else {
                    updateLogWithReceiverId.log("Requesting a new mediator.");
                    this.upstream.request(1L);
                }
            }, duration.toMillis(), TimeUnit.MILLISECONDS);
        }
    }

    public MessageFlux(Flux<? extends AmqpReceiveLink> flux, int i, CreditFlowMode creditFlowMode, AmqpRetryPolicy amqpRetryPolicy) {
        super(flux);
        HashMap hashMap = new HashMap(1);
        hashMap.put(MESSAGE_FLUX_KEY, StringUtil.getRandomString("mf"));
        this.logger = new ClientLogger(MessageFlux.class, hashMap);
        if (i < 0) {
            throw new IllegalArgumentException("prefetch >= 0 required but it was " + i);
        }
        this.prefetch = i;
        this.creditFlowMode = creditFlowMode;
        this.retryPolicy = (AmqpRetryPolicy) Objects.requireNonNull(amqpRetryPolicy, "'retryPolicy' cannot be null.");
        this.updateDispositionFunc = (str, deliveryState) -> {
            return Mono.error(new IllegalStateException("Cannot update disposition as no receive-link is established."));
        };
    }

    public void subscribe(CoreSubscriber<? super Message> coreSubscriber) {
        this.source.subscribe(new RecoverableReactorReceiver(this, coreSubscriber, this.prefetch, this.creditFlowMode, this.retryPolicy));
    }

    public Mono<Void> updateDisposition(String str, DeliveryState deliveryState) {
        return this.updateDispositionFunc.apply(str, deliveryState);
    }

    void onNextUpdateDispositionFunction(BiFunction<String, DeliveryState, Mono<Void>> biFunction) {
        this.updateDispositionFunc = biFunction;
    }
}
