package com.azure.messaging.servicebus.implementation;

import com.azure.core.amqp.AmqpConnection;
import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.implementation.AmqpMetricsProvider;
import com.azure.core.amqp.implementation.ReactorDispatcher;
import com.azure.core.amqp.implementation.ReactorReceiver;
import com.azure.core.amqp.implementation.TokenManager;
import com.azure.core.amqp.implementation.handler.ReceiverUnsettledDeliveries;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.metrics.Meter;
import java.time.Duration;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.message.Message;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.class */
public class ServiceBusReactorReceiver extends ReactorReceiver implements ServiceBusReceiveLink {
    private static final Message EMPTY_MESSAGE = Proton.message();
    private final ClientLogger logger;
    private final boolean isV2;
    private final ReceiverUnsettledDeliveries receiverUnsettledDeliveries;
    private final AtomicBoolean isDisposed;
    private final Receiver receiver;
    private final boolean isSettled;
    private final Mono<String> sessionIdMono;
    private final Mono<OffsetDateTime> sessionLockedUntil;

    public ServiceBusReactorReceiver(AmqpConnection amqpConnection, String str, Receiver receiver, ReactorReceiver.ReceiveLinkHandlerWrapper receiveLinkHandlerWrapper, TokenManager tokenManager, ReactorDispatcher reactorDispatcher, AmqpRetryOptions amqpRetryOptions) {
        super(amqpConnection, str, receiver, receiveLinkHandlerWrapper, tokenManager, reactorDispatcher, amqpRetryOptions, new AmqpMetricsProvider((Meter) null, amqpConnection.getFullyQualifiedNamespace(), str));
        this.isDisposed = new AtomicBoolean();
        this.receiver = receiver;
        this.isSettled = receiver.getSenderSettleMode() == SenderSettleMode.SETTLED;
        HashMap hashMap = new HashMap(2);
        hashMap.put("linkName", receiveLinkHandlerWrapper.getLinkName());
        hashMap.put("entityPath", str);
        this.logger = new ClientLogger(ServiceBusReactorReceiver.class, hashMap);
        receiveLinkHandlerWrapper.setLogger(this.logger);
        this.isV2 = receiveLinkHandlerWrapper.isV2();
        if (this.isV2) {
            this.receiverUnsettledDeliveries = null;
        } else {
            this.receiverUnsettledDeliveries = new ReceiverUnsettledDeliveries(receiveLinkHandlerWrapper.getHostname(), str, receiveLinkHandlerWrapper.getLinkName(), reactorDispatcher, amqpRetryOptions, MessageUtils.ZERO_LOCK_TOKEN, this.logger);
        }
        this.sessionIdMono = getEndpointStates().filter(amqpEndpointState -> {
            return amqpEndpointState == AmqpEndpointState.ACTIVE;
        }).next().flatMap(amqpEndpointState2 -> {
            Object obj = receiver.getRemoteSource().getFilter().get(ServiceBusReactorSession.SESSION_FILTER);
            if (obj != null) {
                return Mono.just(String.valueOf(obj));
            }
            this.logger.info("There is no session id.");
            return Mono.empty();
        }).cache(str2 -> {
            return Duration.ofMillis(Long.MAX_VALUE);
        }, th -> {
            return Duration.ZERO;
        }, () -> {
            return Duration.ZERO;
        });
        this.sessionLockedUntil = getEndpointStates().filter(amqpEndpointState3 -> {
            return amqpEndpointState3 == AmqpEndpointState.ACTIVE;
        }).next().map(amqpEndpointState4 -> {
            if (receiver.getRemoteProperties() != null && receiver.getRemoteProperties().containsKey(ServiceBusReactorSession.LOCKED_UNTIL_UTC)) {
                return MessageUtils.convertDotNetTicksToOffsetDateTime(((Long) receiver.getRemoteProperties().get(ServiceBusReactorSession.LOCKED_UNTIL_UTC)).longValue());
            }
            this.logger.info("Locked until not set.");
            return Instant.EPOCH.atOffset(ZoneOffset.UTC);
        }).cache(offsetDateTime -> {
            return Duration.ofMillis(Long.MAX_VALUE);
        }, th2 -> {
            return Duration.ZERO;
        }, () -> {
            return Duration.ZERO;
        });
    }

    @Override // com.azure.messaging.servicebus.implementation.ServiceBusReceiveLink
    public Mono<Void> updateDisposition(String str, DeliveryState deliveryState) {
        return this.isV2 ? super.updateDisposition(str, deliveryState) : this.isDisposed.get() ? FluxUtil.monoError(this.logger, new IllegalStateException("Cannot perform operations on a disposed receiver.")) : this.receiverUnsettledDeliveries.sendDisposition(str, deliveryState);
    }

    public Flux<Message> receive() {
        return this.isV2 ? super.receive().publishOn(Schedulers.boundedElastic()) : super.receive().filter(message -> {
            return message != EMPTY_MESSAGE;
        }).publishOn(Schedulers.boundedElastic());
    }

    @Override // com.azure.messaging.servicebus.implementation.ServiceBusReceiveLink
    public Mono<String> getSessionId() {
        return this.sessionIdMono;
    }

    @Override // com.azure.messaging.servicebus.implementation.ServiceBusReceiveLink
    public Mono<OffsetDateTime> getSessionLockedUntil() {
        return this.sessionLockedUntil;
    }

    public Mono<Void> closeAsync() {
        return closeAsync("User invoked close operation.", null);
    }

    protected Mono<Void> closeAsync(String str, ErrorCondition errorCondition) {
        return this.isV2 ? super.closeAsync(str, errorCondition) : this.isDisposed.getAndSet(true) ? super.getIsClosedMono() : this.receiverUnsettledDeliveries.terminateAndAwaitForDispositionsInProgressToComplete().then(super.closeAsync(str, errorCondition));
    }

    protected Message decodeDelivery(Delivery delivery) {
        if (this.isV2) {
            throw this.logger.logExceptionAsError(new IllegalStateException("decodeDelivery should not be called in V2 route."));
        }
        byte[] tag = delivery.getTag();
        UUID convertDotNetBytesToUUID = (tag == null || tag.length != 16) ? MessageUtils.ZERO_LOCK_TOKEN : MessageUtils.convertDotNetBytesToUUID(tag);
        if (this.receiverUnsettledDeliveries.containsDelivery(convertDotNetBytesToUUID)) {
            this.receiverUnsettledDeliveries.onDispositionAck(convertDotNetBytesToUUID, delivery);
            return EMPTY_MESSAGE;
        }
        int pending = delivery.pending();
        byte[] bArr = new byte[pending];
        int recv = this.receiver.recv(bArr, 0, pending);
        Message message = Proton.message();
        message.decode(bArr, 0, recv);
        if (this.isSettled) {
            delivery.disposition(Accepted.getInstance());
            delivery.settle();
        } else {
            this.receiverUnsettledDeliveries.onDelivery(convertDotNetBytesToUUID, delivery);
            this.receiver.advance();
        }
        return new MessageWithLockToken(message, convertDotNetBytesToUUID);
    }

    protected void onHandlerClose() {
        if (this.isV2) {
            throw this.logger.logExceptionAsError(new IllegalStateException("onHandlerClose should not be called in V2 route."));
        }
        this.receiverUnsettledDeliveries.close();
    }
}
