package com.azure.core.amqp.implementation;

import com.azure.core.amqp.AmqpConnection;
import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpMessageConstant;
import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.util.AsyncCloseable;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.message.Message;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:com/azure/core/amqp/implementation/ReactorReceiver.class */
public class ReactorReceiver implements AmqpReceiveLink, AsyncCloseable, AutoCloseable {
    private static final Symbol SEQUENCE_NUMBER_ANNOTATION;
    private final String entityPath;
    private final Receiver receiver;
    private final ReceiveLinkHandlerWrapper handler;
    private final TokenManager tokenManager;
    private final ReactorDispatcher dispatcher;
    private final Disposable subscriptions;
    private final Flux<Message> messagesProcessor;
    private final AmqpRetryOptions retryOptions;
    private final ClientLogger logger;
    private final boolean isV2;
    private final Flux<AmqpEndpointState> endpointStates;
    private final AmqpMetricsProvider metricsProvider;
    private final AutoCloseable trackPrefetchSeqNoSubscription;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final Sinks.Empty<Void> isClosedMono = Sinks.empty();
    private final AtomicBoolean isCompleteCloseCalled = new AtomicBoolean();
    private final Sinks.Empty<AmqpEndpointState> terminateEndpointStates = Sinks.empty();
    private final AtomicReference<Supplier<Integer>> creditSupplier = new AtomicReference<>();
    private final AtomicLong lastSequenceNumber = new AtomicLong();

    /* JADX INFO: Access modifiers changed from: protected */
    public ReactorReceiver(AmqpConnection amqpConnection, String str, Receiver receiver, ReceiveLinkHandlerWrapper receiveLinkHandlerWrapper, TokenManager tokenManager, ReactorDispatcher reactorDispatcher, AmqpRetryOptions amqpRetryOptions, AmqpMetricsProvider amqpMetricsProvider) {
        this.entityPath = str;
        this.receiver = receiver;
        this.handler = receiveLinkHandlerWrapper;
        this.tokenManager = tokenManager;
        this.dispatcher = reactorDispatcher;
        this.metricsProvider = amqpMetricsProvider;
        AmqpMetricsProvider amqpMetricsProvider2 = this.metricsProvider;
        AtomicLong atomicLong = this.lastSequenceNumber;
        Objects.requireNonNull(atomicLong);
        this.trackPrefetchSeqNoSubscription = amqpMetricsProvider2.trackPrefetchSequenceNumber(atomicLong::get);
        Map<String, Object> createContextWithConnectionId = AmqpLoggingUtils.createContextWithConnectionId(receiveLinkHandlerWrapper.getConnectionId());
        createContextWithConnectionId.put(ClientConstants.LINK_NAME_KEY, this.handler.getLinkName());
        createContextWithConnectionId.put(ClientConstants.ENTITY_PATH_KEY, str);
        this.logger = new ClientLogger(ReactorReceiver.class, createContextWithConnectionId);
        receiveLinkHandlerWrapper.setLogger(this.logger);
        this.isV2 = receiveLinkHandlerWrapper.isV2();
        if (!this.isV2) {
            this.messagesProcessor = this.handler.getDeliveredMessagesV1().flatMap(delivery -> {
                return Mono.create(monoSink -> {
                    try {
                        this.dispatcher.invoke(() -> {
                            Long sequenceNumber;
                            if (isDisposed()) {
                                monoSink.error(new IllegalStateException("Cannot decode delivery when ReactorReceiver instance is closed."));
                                return;
                            }
                            Message decodeDelivery = decodeDelivery(delivery);
                            if (amqpMetricsProvider.isPrefetchedSequenceNumberEnabled() && (sequenceNumber = getSequenceNumber(decodeDelivery)) != null) {
                                this.lastSequenceNumber.set(sequenceNumber.longValue());
                            }
                            if (receiver.getRemoteCredit() > 0) {
                                monoSink.success(decodeDelivery);
                                return;
                            }
                            Integer num = this.creditSupplier.get().get();
                            if (num != null && num.intValue() > 0) {
                                this.logger.atVerbose().addKeyValue("credits", num).log("Adding credits.");
                                receiver.flow(num.intValue());
                            }
                            amqpMetricsProvider.recordAddCredits(num == null ? 0 : num.intValue());
                            monoSink.success(decodeDelivery);
                        });
                    } catch (IOException | RejectedExecutionException e) {
                        monoSink.error(e);
                    }
                });
            }, 1);
        } else if (amqpMetricsProvider.isPrefetchedSequenceNumberEnabled()) {
            this.messagesProcessor = this.handler.getDeliveredMessagesV2().map(message -> {
                Long sequenceNumber = getSequenceNumber(message);
                if (sequenceNumber != null) {
                    this.lastSequenceNumber.set(sequenceNumber.longValue());
                }
                return message;
            });
        } else {
            this.messagesProcessor = this.handler.getDeliveredMessagesV2();
        }
        this.retryOptions = amqpRetryOptions;
        this.endpointStates = this.handler.getEndpointStates().map(endpointState -> {
            this.logger.atVerbose().log("State {}", new Object[]{endpointState});
            return AmqpEndpointStateUtil.getConnectionState(endpointState);
        }).doOnError(th -> {
            this.logger.atInfo().log(this.isDisposed.getAndSet(true) ? "This was already disposed. Dropping error." : "Freeing resources due to error.");
            completeClose();
        }).doOnComplete(() -> {
            this.logger.atVerbose().log(this.isDisposed.getAndSet(true) ? "This was already disposed." : "Freeing resources.");
            completeClose();
        }).cache(1);
        this.subscriptions = Disposables.composite(new Disposable[]{this.endpointStates.subscribe((Consumer) null, th2 -> {
            this.logger.warning("Receive link endpoint state signaled error.", new Object[]{th2});
        }), this.tokenManager.getAuthorizationResults().onErrorResume(th3 -> {
            return closeAsync("Token renewal failure. Disposing receive link.", new ErrorCondition(Symbol.getSymbol(AmqpErrorCondition.NOT_ALLOWED.getErrorCondition()), th3.getMessage())).then(Mono.empty());
        }).subscribe(amqpResponseCode -> {
            this.logger.atVerbose().addKeyValue("response", amqpResponseCode).log("Token refreshed.");
        }, th4 -> {
        }, () -> {
            this.logger.atVerbose().log("Authorization completed.");
            closeAsync("Authorization completed. Disposing.", null).subscribe();
        }), amqpConnection.getShutdownSignals().flatMap(amqpShutdownSignal -> {
            this.logger.verbose("Shutdown signal received.");
            return closeAsync("Connection shutdown.", null);
        }).subscribe()});
    }

    @Override // com.azure.core.amqp.AmqpLink
    public Flux<AmqpEndpointState> getEndpointStates() {
        return this.endpointStates.distinctUntilChanged().takeUntilOther(this.terminateEndpointStates.asMono());
    }

    @Override // com.azure.core.amqp.implementation.AmqpReceiveLink
    public String getConnectionId() {
        return this.handler.getConnectionId();
    }

    @Override // com.azure.core.amqp.implementation.AmqpReceiveLink
    public Flux<Message> receive() {
        return this.messagesProcessor;
    }

    @Override // com.azure.core.amqp.implementation.AmqpReceiveLink
    public Mono<Void> updateDisposition(String str, DeliveryState deliveryState) {
        return this.handler.sendDisposition(str, deliveryState);
    }

    @Override // com.azure.core.amqp.implementation.AmqpReceiveLink
    public Mono<Void> addCredits(int i) {
        return isDisposed() ? FluxUtil.monoError(this.logger.atWarning(), new IllegalStateException("Cannot add credits to closed link: " + getLinkName())) : Mono.create(monoSink -> {
            try {
                this.dispatcher.invoke(() -> {
                    this.receiver.flow(i);
                    this.metricsProvider.recordAddCredits(i);
                    monoSink.success();
                });
            } catch (IOException e) {
                monoSink.error(new UncheckedIOException(String.format("connectionId[%s] linkName[%s] Unable to schedule work to add more credits.", this.handler.getConnectionId(), getLinkName()), e));
            } catch (RejectedExecutionException e2) {
                monoSink.error(e2);
            }
        });
    }

    @Override // com.azure.core.amqp.implementation.AmqpReceiveLink
    public void addCredit(Supplier<Long> supplier) {
        if (!$assertionsDisabled && !this.isV2) {
            throw new AssertionError();
        }
        if (isDisposed()) {
            throw new RejectedExecutionException("Cannot schedule credit flow when the link is disposed.");
        }
        try {
            this.dispatcher.invoke(() -> {
                long longValue = ((Long) supplier.get()).longValue();
                this.receiver.flow((int) longValue);
                this.metricsProvider.recordAddCredits((int) longValue);
            });
        } catch (IOException e) {
            throw new UncheckedIOException("Unable to schedule credit flow.", e);
        }
    }

    @Override // com.azure.core.amqp.implementation.AmqpReceiveLink
    public int getCredits() {
        return this.receiver.getRemoteCredit();
    }

    @Override // com.azure.core.amqp.implementation.AmqpReceiveLink
    public void setEmptyCreditListener(Supplier<Integer> supplier) {
        if (!$assertionsDisabled && this.isV2) {
            throw new AssertionError();
        }
        Objects.requireNonNull(supplier, "'creditSupplier' cannot be null.");
        this.creditSupplier.set(supplier);
    }

    @Override // com.azure.core.amqp.AmqpLink
    public String getLinkName() {
        return this.handler.getLinkName();
    }

    @Override // com.azure.core.amqp.AmqpLink
    public String getEntityPath() {
        return this.entityPath;
    }

    @Override // com.azure.core.amqp.AmqpLink
    public String getHostname() {
        return this.handler.getHostname();
    }

    public boolean isDisposed() {
        return this.isDisposed.get();
    }

    public void dispose() {
        close();
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        closeAsync().block(this.retryOptions.getTryTimeout());
    }

    @Override // com.azure.core.amqp.AmqpLink
    public Mono<Void> closeAsync() {
        return closeAsync("User invoked close operation.", null);
    }

    protected Message decodeDelivery(Delivery delivery) {
        if (!$assertionsDisabled && this.isV2) {
            throw new AssertionError();
        }
        int pending = delivery.pending();
        byte[] bArr = new byte[pending];
        int recv = this.receiver.recv(bArr, 0, pending);
        this.receiver.advance();
        Message message = Proton.message();
        message.decode(bArr, 0, recv);
        delivery.settle();
        return message;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Mono<Void> closeAsync(String str, ErrorCondition errorCondition) {
        if (this.isDisposed.getAndSet(true)) {
            return getIsClosedMono();
        }
        AmqpLoggingUtils.addErrorCondition(this.logger.atVerbose(), errorCondition).log("Setting error condition and disposing. {}", new Object[]{str});
        return beginClose(errorCondition).flatMap(bool -> {
            return bool.booleanValue() ? timeoutRemoteCloseAck() : Mono.empty();
        }).publishOn(Schedulers.boundedElastic());
    }

    protected Mono<Void> getIsClosedMono() {
        return this.isClosedMono.asMono().publishOn(Schedulers.boundedElastic());
    }

    protected void onHandlerClose() {
        if (!$assertionsDisabled && this.isV2) {
            throw new AssertionError();
        }
    }

    private Mono<Boolean> beginClose(ErrorCondition errorCondition) {
        Runnable runnable = () -> {
            if (this.receiver.getLocalState() != EndpointState.CLOSED) {
                this.receiver.close();
                if (this.receiver.getCondition() == null) {
                    this.receiver.setCondition(errorCondition);
                }
            }
        };
        return this.handler.beginClose().then(Mono.create(monoSink -> {
            boolean z = false;
            try {
                try {
                    try {
                        this.dispatcher.invoke(runnable);
                        z = true;
                        monoSink.success(true);
                    } catch (IOException e) {
                        this.logger.warning("IO sink was closed when scheduling work. Manually invoking and completing close.", new Object[]{e});
                        runnable.run();
                        terminateEndpointState();
                        completeClose();
                        monoSink.success(Boolean.valueOf(z));
                    }
                } catch (RejectedExecutionException e2) {
                    this.logger.info("RejectedExecutionException when scheduling on ReactorDispatcher. Manually invoking and completing close.");
                    runnable.run();
                    terminateEndpointState();
                    completeClose();
                    monoSink.success(Boolean.valueOf(z));
                }
            } catch (Throwable th) {
                monoSink.success(Boolean.valueOf(z));
                throw th;
            }
        }));
    }

    private Mono<Void> timeoutRemoteCloseAck() {
        return this.isClosedMono.asMono().timeout(this.retryOptions.getTryTimeout()).onErrorResume(th -> {
            if (th instanceof TimeoutException) {
                this.logger.info("Timeout waiting for RemoteClose. Manually terminating EndpointStates and completing close.");
                terminateEndpointState();
                completeClose();
            }
            return Mono.empty();
        }).subscribeOn(Schedulers.boundedElastic());
    }

    private void terminateEndpointState() {
        this.terminateEndpointStates.emitEmpty((signalType, emitResult) -> {
            AmqpLoggingUtils.addSignalTypeAndResult(this.logger.atVerbose(), signalType, emitResult).log("Could not emit EndpointStates termination.");
            return false;
        });
    }

    private void completeClose() {
        if (this.isCompleteCloseCalled.getAndSet(true)) {
            return;
        }
        this.isClosedMono.emitEmpty((signalType, emitResult) -> {
            AmqpLoggingUtils.addSignalTypeAndResult(this.logger.atWarning(), signalType, emitResult).log("Unable to emit shutdown signal.");
            return false;
        });
        this.subscriptions.dispose();
        if (this.tokenManager != null) {
            this.tokenManager.close();
        }
        this.handler.close();
        if (!this.isV2) {
            onHandlerClose();
        }
        this.receiver.free();
        try {
            this.trackPrefetchSeqNoSubscription.close();
        } catch (Exception e) {
            this.logger.verbose("Error closing metrics subscription.", new Object[]{e});
        }
    }

    private Long getSequenceNumber(Message message) {
        if (message == null || message.getMessageAnnotations() == null || message.getBody() == null) {
            return null;
        }
        Map value = message.getMessageAnnotations().getValue();
        Object obj = value != null ? value.get(SEQUENCE_NUMBER_ANNOTATION) : null;
        if (obj instanceof Integer) {
            return Long.valueOf(((Integer) obj).longValue());
        }
        if (obj instanceof Long) {
            return (Long) obj;
        }
        if (obj == null) {
            return null;
        }
        this.logger.verbose("Received message has unexpected `x-opt-sequence-number` annotation value - `{}`. Ignoring it.", new Object[]{obj});
        return null;
    }

    public String toString() {
        return String.format("connectionId: [%s] entity path: [%s] linkName: [%s]", this.receiver.getName(), this.entityPath, getLinkName());
    }

    static {
        $assertionsDisabled = !ReactorReceiver.class.desiredAssertionStatus();
        SEQUENCE_NUMBER_ANNOTATION = Symbol.valueOf(AmqpMessageConstant.SEQUENCE_NUMBER_ANNOTATION_NAME.getValue());
    }
}
