/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.servicebus;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.implementation.CreditFlowMode;
import com.azure.core.amqp.implementation.MessageFlux;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.handler.DeliveryNotOnLinkException;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.logging.LoggingEventBuilder;
import com.azure.messaging.servicebus.IServiceBusSessionManager;
import com.azure.messaging.servicebus.ServiceBusMessageContext;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.ServiceBusSessionReactorReceiver;
import com.azure.messaging.servicebus.ServiceBusTransactionContext;
import com.azure.messaging.servicebus.TracingFluxOperator;
import com.azure.messaging.servicebus.implementation.DispositionStatus;
import com.azure.messaging.servicebus.implementation.MessageUtils;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.message.Message;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

final class ServiceBusSingleSessionManager
implements IServiceBusSessionManager {
    private final ClientLogger logger;
    private final String identifier;
    private final MessageSerializer serializer;
    private final Duration operationTimeout;
    private final ServiceBusSessionReactorReceiver sessionReceiver;
    private final Flux<Message> messageFlux;
    private final ServiceBusReceiverInstrumentation instrumentation;

    ServiceBusSingleSessionManager(ClientLogger logger, String identifier, ServiceBusSessionReactorReceiver sessionReceiver, int prefetch, MessageSerializer serializer, AmqpRetryOptions retryOptions, ServiceBusReceiverInstrumentation instrumentation) {
        this.logger = Objects.requireNonNull(logger, "logger cannot be null.");
        this.identifier = identifier;
        this.sessionReceiver = Objects.requireNonNull(sessionReceiver, "sessionReceiver cannot be null.");
        this.serializer = Objects.requireNonNull(serializer, "serializer cannot be null.");
        Objects.requireNonNull(retryOptions, "retryOptions cannot be null.");
        this.operationTimeout = retryOptions.getTryTimeout();
        Flux<ServiceBusSessionReactorReceiver> messageFluxUpstream = new SessionReceiverStream(sessionReceiver).flux();
        this.instrumentation = Objects.requireNonNull(instrumentation, "instrumentation cannot be null");
        MessageFlux messageFluxLocal = new MessageFlux(messageFluxUpstream, prefetch, CreditFlowMode.RequestDriven, MessageFlux.NULL_RETRY_POLICY);
        this.messageFlux = TracingFluxOperator.create(messageFluxLocal, instrumentation);
    }

    @Override
    public String getIdentifier() {
        return this.identifier;
    }

    @Override
    public String getLinkName(String sessionId) {
        return this.sessionReceiver.getSessionId().equals(sessionId) ? this.sessionReceiver.getLinkName() : null;
    }

    @Override
    public Flux<ServiceBusMessageContext> receive() {
        return this.receiveMessages().map(m -> new ServiceBusMessageContext((ServiceBusReceivedMessage)m)).onErrorResume(e -> Mono.just((Object)new ServiceBusMessageContext(this.sessionReceiver.getSessionId(), (Throwable)e)));
    }

    @Override
    public Mono<Boolean> updateDisposition(String lockToken, String sessionId, DispositionStatus dispositionStatus, Map<String, Object> propertiesToModify, String deadLetterReason, String deadLetterDescription, ServiceBusTransactionContext transactionContext) {
        DeliveryState deliveryState = MessageUtils.getDeliveryState(dispositionStatus, deadLetterReason, deadLetterDescription, propertiesToModify, transactionContext);
        if (this.sessionReceiver.getSessionId().equals(sessionId)) {
            return this.sessionReceiver.updateDisposition(lockToken, deliveryState).thenReturn((Object)true);
        }
        return Mono.error((Throwable)DeliveryNotOnLinkException.noMatchingDelivery((String)lockToken, (DeliveryState)deliveryState));
    }

    @Override
    public void close() {
        this.sessionReceiver.closeAsync().block(this.operationTimeout);
    }

    Flux<ServiceBusReceivedMessage> receiveMessages() {
        return this.messageFlux.map(qpidMessage -> {
            ServiceBusReceivedMessage m = (ServiceBusReceivedMessage)this.serializer.deserialize(qpidMessage, ServiceBusReceivedMessage.class);
            this.logger.atVerbose().addKeyValue("sessionId", this.sessionReceiver.getSessionId()).addKeyValue("messageId", m.getMessageId()).log("Received message.");
            return m;
        }).doOnError(e -> this.withLinkInfo(this.logger.atWarning()).log("Error occurred. Ending session.", new Object[]{e}));
    }

    private LoggingEventBuilder withLinkInfo(LoggingEventBuilder builder) {
        return builder.addKeyValue("sessionId", this.sessionReceiver.getSessionId()).addKeyValue("entityPath", this.sessionReceiver.getEntityPath()).addKeyValue("linkName", this.sessionReceiver.getLinkName());
    }

    private static final class SessionReceiverStream
    extends AtomicBoolean
    implements Consumer<FluxSink<ServiceBusSessionReactorReceiver>> {
        private final ServiceBusSessionReactorReceiver sessionReceiver;

        SessionReceiverStream(ServiceBusSessionReactorReceiver sessionReceiver) {
            super(false);
            this.sessionReceiver = sessionReceiver;
        }

        public Flux<ServiceBusSessionReactorReceiver> flux() {
            SessionReceiverStream emitter = this;
            return Flux.create((Consumer)emitter);
        }

        @Override
        public void accept(FluxSink<ServiceBusSessionReactorReceiver> sink) {
            sink.onRequest(r -> {
                if (r != 1L) {
                    sink.error((Throwable)new UnsupportedOperationException("Expects one request for sessionReceiver but was " + r));
                    return;
                }
                boolean emittedOnce = this.getAndSet(true);
                if (emittedOnce) {
                    sink.error((Throwable)new UnsupportedOperationException("Cannot subscribe or request for sessionReceiver more than once."));
                    return;
                }
                sink.next((Object)this.sessionReceiver);
            });
        }
    }
}

