/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.servicebus;

import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.LockRenewalOperation;
import com.azure.messaging.servicebus.ReceiverOptions;
import com.azure.messaging.servicebus.ServiceBusMessageContext;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.implementation.LockContainer;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusTracer;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;

final class FluxAutoLockRenew
extends FluxOperator<ServiceBusMessageContext, ServiceBusMessageContext> {
    private static final ClientLogger LOGGER = new ClientLogger(FluxAutoLockRenew.class);
    private final Function<String, Mono<OffsetDateTime>> onRenewLock;
    private final LockContainer<LockRenewalOperation> messageLockContainer;
    private final ReceiverOptions receivingOptions;
    private final ServiceBusTracer tracer;

    FluxAutoLockRenew(Flux<? extends ServiceBusMessageContext> source, ReceiverOptions receiverOptions, LockContainer<LockRenewalOperation> messageLockContainer, Function<String, Mono<OffsetDateTime>> onRenewLock, ServiceBusTracer tracer) {
        super(source);
        this.receivingOptions = Objects.requireNonNull(receiverOptions, "'receiverOptions' cannot be null.");
        this.onRenewLock = Objects.requireNonNull(onRenewLock, "'onRenewLock' cannot be null.");
        this.messageLockContainer = Objects.requireNonNull(messageLockContainer, "'messageLockContainer' cannot be null.");
        this.tracer = Objects.requireNonNull(tracer, "'tracer' cannot be null.");
        Duration maxAutoLockRenewDuration = receiverOptions.getMaxLockRenewDuration();
        Objects.requireNonNull(maxAutoLockRenewDuration, "'receivingOptions.maxAutoLockRenewDuration' cannot be null.");
        if (maxAutoLockRenewDuration.isNegative() || maxAutoLockRenewDuration.isZero()) {
            throw LOGGER.logExceptionAsError((RuntimeException)new IllegalArgumentException("'receivingOptions.maxLockRenewalDuration' should not be zero or negative."));
        }
    }

    public void subscribe(CoreSubscriber<? super ServiceBusMessageContext> coreSubscriber) {
        Objects.requireNonNull(coreSubscriber, "'coreSubscriber' cannot be null.");
        LockRenewSubscriber newLockRenewSubscriber = new LockRenewSubscriber(coreSubscriber, this.receivingOptions.getMaxLockRenewDuration(), this.messageLockContainer, this.onRenewLock, this.receivingOptions.isEnableAutoComplete(), this.tracer);
        this.source.subscribe((CoreSubscriber)newLockRenewSubscriber);
    }

    static final class LockRenewSubscriber
    extends BaseSubscriber<ServiceBusMessageContext> {
        private static final Consumer<ServiceBusMessageContext> LOCK_RENEW_NO_OP = messageContext -> {};
        private static final ClientLogger LOGGER = new ClientLogger(LockRenewSubscriber.class);
        private final Function<String, Mono<OffsetDateTime>> onRenewLock;
        private final Duration maxAutoLockRenewal;
        private final LockContainer<LockRenewalOperation> messageLockContainer;
        private final CoreSubscriber<? super ServiceBusMessageContext> actual;
        private final boolean isAutoCompleteEnabled;
        private final ServiceBusTracer tracer;

        LockRenewSubscriber(CoreSubscriber<? super ServiceBusMessageContext> actual, Duration maxAutoLockRenewDuration, LockContainer<LockRenewalOperation> messageLockContainer, Function<String, Mono<OffsetDateTime>> onRenewLock, boolean isAutoCompleteEnabled, ServiceBusTracer tracer) {
            this.onRenewLock = Objects.requireNonNull(onRenewLock, "'onRenewLock' cannot be null.");
            this.actual = Objects.requireNonNull(actual, "'downstream' cannot be null.");
            this.messageLockContainer = Objects.requireNonNull(messageLockContainer, "'messageLockContainer' cannot be null.");
            this.maxAutoLockRenewal = Objects.requireNonNull(maxAutoLockRenewDuration, "'maxAutoLockRenewDuration' cannot be null.");
            this.isAutoCompleteEnabled = isAutoCompleteEnabled;
            this.tracer = tracer;
        }

        protected void hookOnSubscribe(Subscription subscription) {
            Objects.requireNonNull(subscription, "'subscription' cannot be null.");
            this.actual.onSubscribe(subscription);
        }

        public void hookOnComplete() {
            LOGGER.verbose("Upstream has completed.");
            this.actual.onComplete();
        }

        protected void hookOnError(Throwable throwable) {
            LOGGER.error("Errors occurred upstream.", new Object[]{throwable});
            this.actual.onError(throwable);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected void hookOnNext(ServiceBusMessageContext messageContext) {
            Consumer<ServiceBusMessageContext> lockCleanup;
            ServiceBusReceivedMessage message = messageContext.getMessage();
            if (message != null) {
                String lockToken = message.getLockToken();
                OffsetDateTime lockedUntil = message.getLockedUntil();
                if (Objects.isNull(lockToken)) {
                    LOGGER.atWarning().addKeyValue("sequenceNumber", message.getSequenceNumber()).log("Unexpected, LockToken is not present in message.");
                    return;
                }
                if (Objects.isNull(lockedUntil)) {
                    LOGGER.atWarning().addKeyValue("sequenceNumber", message.getSequenceNumber()).log("Unexpected, lockedUntil is not present in message.");
                    return;
                }
                Function<String, Mono<OffsetDateTime>> onRenewLockUpdateMessage = this.onRenewLock.andThen(updated -> this.tracer.traceRenewMessageLock(updated.map(newLockedUntil -> {
                    message.setLockedUntil((OffsetDateTime)newLockedUntil);
                    return newLockedUntil;
                }), message));
                LockRenewalOperation renewOperation = new LockRenewalOperation(lockToken, this.maxAutoLockRenewal, false, onRenewLockUpdateMessage, lockedUntil);
                try {
                    this.messageLockContainer.addOrUpdate(lockToken, OffsetDateTime.now().plus(this.maxAutoLockRenewal), renewOperation);
                }
                catch (Exception e) {
                    LOGGER.atInfo().addKeyValue("lockToken", lockToken).log("Exception occurred while updating lockContainer.", new Object[]{e});
                }
                lockCleanup = context -> {
                    renewOperation.close();
                    this.messageLockContainer.remove(context.getMessage().getLockToken());
                };
            } else {
                lockCleanup = LOCK_RENEW_NO_OP;
            }
            try {
                this.actual.onNext((Object)messageContext);
            }
            catch (Exception e) {
                LOGGER.info("Exception occurred while handling downstream onNext operation.", new Object[]{e});
            }
            finally {
                if (this.isAutoCompleteEnabled) {
                    lockCleanup.accept(messageContext);
                }
            }
        }

        public Context currentContext() {
            return this.actual.currentContext();
        }
    }
}

