/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.servicebus;

import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.LockRenewalStatus;
import com.azure.messaging.servicebus.implementation.ServiceBusConstants;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.HashMap;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;

class LockRenewalOperation
implements AutoCloseable,
Disposable {
    private final ClientLogger logger;
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final AtomicReference<OffsetDateTime> lockedUntil = new AtomicReference();
    private final AtomicReference<Throwable> throwable = new AtomicReference();
    private final AtomicReference<LockRenewalStatus> status = new AtomicReference<LockRenewalStatus>(LockRenewalStatus.RUNNING);
    private final MonoProcessor<Void> cancellationProcessor = MonoProcessor.create();
    private final Mono<Void> completionMono;
    private final String lockToken;
    private final boolean isSession;
    private final Function<String, Mono<OffsetDateTime>> renewalOperation;
    private final Disposable subscription;

    LockRenewalOperation(String lockToken, Duration maxLockRenewalDuration, boolean isSession, Function<String, Mono<OffsetDateTime>> renewalOperation) {
        this(lockToken, maxLockRenewalDuration, isSession, renewalOperation, OffsetDateTime.now());
    }

    LockRenewalOperation(String lockToken, Duration maxLockRenewalDuration, boolean isSession, Function<String, Mono<OffsetDateTime>> renewalOperation, OffsetDateTime tokenLockedUntil) {
        this.lockToken = Objects.requireNonNull(lockToken, "'lockToken' cannot be null.");
        this.renewalOperation = Objects.requireNonNull(renewalOperation, "'renewalOperation' cannot be null.");
        this.isSession = isSession;
        Objects.requireNonNull(tokenLockedUntil, "'lockedUntil cannot be null.'");
        Objects.requireNonNull(maxLockRenewalDuration, "'maxLockRenewalDuration' cannot be null.");
        HashMap<String, Object> loggingContext = new HashMap<String, Object>(2);
        loggingContext.put("lockToken", lockToken);
        loggingContext.put("isSession", isSession);
        this.logger = new ClientLogger(LockRenewalOperation.class, loggingContext);
        if (maxLockRenewalDuration.isNegative()) {
            throw this.logger.logExceptionAsError((RuntimeException)new IllegalArgumentException("'maxLockRenewalDuration' cannot be negative."));
        }
        this.lockedUntil.set(tokenLockedUntil);
        Flux renewLockOperation = this.getRenewLockOperation(tokenLockedUntil, maxLockRenewalDuration).takeUntilOther(this.cancellationProcessor).cache(Duration.ofMinutes(2L));
        this.completionMono = renewLockOperation.then();
        this.subscription = renewLockOperation.subscribe(until -> this.lockedUntil.set((OffsetDateTime)until), error -> {
            this.logger.error("Error occurred while renewing lock token.", new Object[]{error});
            this.status.set(LockRenewalStatus.FAILED);
            this.throwable.set((Throwable)error);
            this.cancellationProcessor.onComplete();
        }, () -> {
            if (this.status.compareAndSet(LockRenewalStatus.RUNNING, LockRenewalStatus.COMPLETE)) {
                this.logger.verbose("Renewing lock task completed.");
            }
            this.cancellationProcessor.onComplete();
        });
    }

    Mono<Void> getCompletionOperation() {
        return this.completionMono;
    }

    OffsetDateTime getLockedUntil() {
        return this.lockedUntil.get();
    }

    String getLockToken() {
        return this.isSession ? null : this.lockToken;
    }

    String getSessionId() {
        return this.isSession ? this.lockToken : null;
    }

    LockRenewalStatus getStatus() {
        return this.status.get();
    }

    Throwable getThrowable() {
        return this.throwable.get();
    }

    @Override
    public void close() {
        if (this.isDisposed.getAndSet(true)) {
            return;
        }
        if (this.status.compareAndSet(LockRenewalStatus.RUNNING, LockRenewalStatus.CANCELLED)) {
            this.logger.verbose("Cancelled operation.");
        }
        this.cancellationProcessor.onComplete();
        this.subscription.dispose();
    }

    public void dispose() {
        this.close();
    }

    public boolean isDisposed() {
        return this.isDisposed.get();
    }

    private Flux<OffsetDateTime> getRenewLockOperation(OffsetDateTime initialLockedUntil, Duration maxLockRenewalDuration) {
        if (maxLockRenewalDuration.isZero()) {
            this.status.set(LockRenewalStatus.COMPLETE);
            return Flux.empty();
        }
        EmitterProcessor emitterProcessor = EmitterProcessor.create();
        FluxSink sink = emitterProcessor.sink();
        sink.next((Object)this.calculateRenewalDelay(initialLockedUntil));
        Flux cancellationSignals = Flux.first((Publisher[])new Publisher[]{this.cancellationProcessor, Mono.delay((Duration)maxLockRenewalDuration)});
        return Flux.switchOnNext((Publisher)emitterProcessor.map(interval -> Mono.delay((Duration)interval).thenReturn((Object)Flux.create(s -> s.next(interval))))).takeUntilOther((Publisher)cancellationSignals).flatMap(delay -> {
            this.logger.info("Starting lock renewal.");
            return (Publisher)this.renewalOperation.apply(this.lockToken);
        }).map(offsetDateTime -> {
            Duration next = Duration.between(OffsetDateTime.now(), offsetDateTime);
            this.logger.atInfo().addKeyValue("nextExpiration", offsetDateTime).addKeyValue("next", (Object)next).log("Starting lock renewal.");
            sink.next((Object)this.calculateRenewalDelay((OffsetDateTime)offsetDateTime));
            return offsetDateTime;
        });
    }

    private Duration calculateRenewalDelay(OffsetDateTime initialLockedUntil) {
        OffsetDateTime now = OffsetDateTime.now();
        Duration remainingTime = Duration.between(now, initialLockedUntil);
        if (remainingTime.toMillis() < 400L) {
            this.logger.atInfo().addKeyValue("lockedUntil", (Object)initialLockedUntil).log("Duration was less than 400ms.");
            return Duration.ZERO;
        }
        long bufferInMilliSec = Math.min(remainingTime.toMillis() / 2L, ServiceBusConstants.MAX_RENEWAL_BUFFER_DURATION.toMillis());
        Duration renewAfter = Duration.ofMillis(remainingTime.toMillis() - bufferInMilliSec);
        if (renewAfter.isNegative()) {
            this.logger.atInfo().addKeyValue("renewAfter", remainingTime.toMillis()).addKeyValue("buffer", bufferInMilliSec).log("Adjusted duration is negative.");
        }
        return renewAfter;
    }
}

