/*
 * Decompiled with CFR 0.152.
 */
package com.azure.core.amqp.implementation;

import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.ReactorConnection;
import com.azure.core.amqp.implementation.RequestResponseChannel;
import com.azure.core.amqp.implementation.RequestResponseChannelClosedException;
import com.azure.core.util.logging.ClientLogger;
import java.time.Duration;
import java.util.HashMap;
import java.util.Objects;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeoutException;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.retry.Retry;

public final class RequestResponseChannelCache
implements Disposable {
    private static final String IS_CACHE_TERMINATED_KEY = "isCacheTerminated";
    private static final String IS_CONNECTION_TERMINATED_KEY = "isConnectionTerminated";
    private static final String TRY_COUNT_KEY = "tryCount";
    private final Sinks.Empty<Void> isClosedMono = Sinks.empty();
    private final ClientLogger logger;
    private final ReactorConnection connection;
    private final Duration activationTimeout;
    private final Mono<RequestResponseChannel> createOrGetCachedChannel;
    private final Object lock = new Object();
    private volatile boolean terminated;
    private volatile RequestResponseChannel currentChannel;

    public RequestResponseChannelCache(ReactorConnection connection, String entityPath, String sessionName, String linksName, AmqpRetryPolicy retryPolicy) {
        Objects.requireNonNull(connection, "'connection' cannot be null.");
        Objects.requireNonNull(entityPath, "'entityPath' cannot be null.");
        Objects.requireNonNull(sessionName, "'sessionName' cannot be null.");
        Objects.requireNonNull(linksName, "'linksName' cannot be null.");
        Objects.requireNonNull(retryPolicy, "'retryPolicy' cannot be null.");
        HashMap<String, String> loggingContext = new HashMap<String, String>(2);
        loggingContext.put("connectionId", connection.getId());
        loggingContext.put("linkName", linksName);
        this.logger = new ClientLogger(RequestResponseChannelCache.class, loggingContext);
        this.connection = connection;
        this.activationTimeout = retryPolicy.getRetryOptions().getTryTimeout();
        Mono newChannel = Mono.defer(() -> {
            RecoveryTerminatedException terminatedError = this.checkRecoveryTerminated("new-channel");
            if (terminatedError != null) {
                return Mono.error((Throwable)terminatedError);
            }
            return this.connection.newRequestResponseChannel(sessionName, linksName, entityPath);
        });
        this.createOrGetCachedChannel = newChannel.flatMap(c -> RequestResponseChannelCache.awaitToActive(c, this.activationTimeout, this.logger)).retryWhen(this.retryWhenSpec(retryPolicy)).handle((c, sink) -> {
            RecoveryTerminatedException terminatedError;
            Object object = this.lock;
            synchronized (object) {
                terminatedError = this.checkRecoveryTerminated("cache-refresh");
                this.currentChannel = c;
            }
            if (terminatedError != null) {
                if (!c.isDisposed()) {
                    c.closeAsync().subscribe();
                }
                sink.error((Throwable)terminatedError.propagate());
            } else {
                this.logger.atInfo().log("Emitting the new active channel.");
                sink.next(c);
            }
        }).cacheInvalidateIf(c -> {
            if (c.isDisposedOrDisposalInInProgress()) {
                this.logger.atInfo().log("The channel is closed, requesting a new channel.");
                return true;
            }
            return false;
        });
    }

    public Mono<RequestResponseChannel> get() {
        return this.createOrGetCachedChannel;
    }

    public void dispose() {
        this.closeAsync().subscribe();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    Mono<Void> closeAsync() {
        RequestResponseChannel cached;
        Object object = this.lock;
        synchronized (object) {
            if (this.terminated) {
                return this.isClosedMono.asMono();
            }
            this.terminated = true;
            cached = this.currentChannel;
        }
        if (cached == null || cached.isDisposed()) {
            this.logger.atInfo().log("closing the channel-cache.");
            this.isClosedMono.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST);
            return this.isClosedMono.asMono();
        }
        return cached.closeAsync().doOnEach(signal -> {
            if (signal.isOnError() || signal.isOnComplete()) {
                this.logger.atInfo().log("closing the cached channel and the channel-cache.");
                this.isClosedMono.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST);
            }
        });
    }

    public boolean isDisposed() {
        return this.terminated;
    }

    private Retry retryWhenSpec(AmqpRetryPolicy retryPolicy) {
        return Retry.from(retrySignals -> retrySignals.concatMap(retrySignal -> {
            long attempts;
            boolean shouldRetry;
            Retry.RetrySignal signal = retrySignal.copy();
            Throwable error = signal.failure();
            long iteration = signal.totalRetriesInARow();
            if (error == null) {
                return Mono.error((Throwable)new IllegalStateException("RetrySignal::failure() not expected to be null."));
            }
            boolean bl = shouldRetry = error instanceof TimeoutException || error instanceof AmqpException && ((AmqpException)((Object)((Object)((Object)error)))).isTransient() || error instanceof IllegalStateException || error instanceof RejectedExecutionException;
            if (!shouldRetry) {
                this.logger.atWarning().addKeyValue(TRY_COUNT_KEY, iteration).log("Exception is non-retriable, not retrying for a new channel.", new Object[]{error});
                if (error instanceof RecoveryTerminatedException) {
                    return Mono.error((Throwable)((RecoveryTerminatedException)error).propagate());
                }
                return Mono.error((Throwable)error);
            }
            Object errorToUse = error instanceof AmqpException ? error : new AmqpException(true, "Non-AmqpException occurred upstream.", error, null);
            Duration backoff = retryPolicy.calculateRetryDelay((Throwable)errorToUse, (int)(attempts = Math.min(iteration, (long)retryPolicy.getMaxRetries())));
            if (backoff == null) {
                this.logger.atWarning().addKeyValue(TRY_COUNT_KEY, iteration).log("Retry is disabled, not retrying for a new channel.", new Object[]{error});
                return Mono.error((Throwable)error);
            }
            this.logger.atInfo().addKeyValue(TRY_COUNT_KEY, iteration).addKeyValue("intervalMs", backoff.toMillis()).log("Transient error occurred. Retrying.", new Object[]{error});
            return Mono.delay((Duration)backoff);
        }));
    }

    private RecoveryTerminatedException checkRecoveryTerminated(String callSite) {
        boolean isCacheTerminated = this.terminated;
        boolean isConnectionTerminated = this.connection.isDisposed();
        if (isCacheTerminated || isConnectionTerminated) {
            this.logger.atInfo().addKeyValue(IS_CACHE_TERMINATED_KEY, isCacheTerminated).addKeyValue(IS_CONNECTION_TERMINATED_KEY, isConnectionTerminated).addKeyValue("callSite", callSite).log("Channel recovery support is terminated.");
            return new RecoveryTerminatedException(this.connection.getId(), isCacheTerminated, isConnectionTerminated);
        }
        return null;
    }

    private static Mono<RequestResponseChannel> awaitToActive(RequestResponseChannel channel, Duration timeout, ClientLogger logger) {
        logger.atInfo().log("Waiting for channel to active.");
        return channel.getEndpointStates().filter(s -> s == AmqpEndpointState.ACTIVE).next().switchIfEmpty(Mono.error(() -> new AmqpException(true, "Channel completed without being active.", null))).timeout(timeout, Mono.defer(() -> {
            String timeoutMessage = "Timeout waiting for channel to be active";
            logger.atInfo().addKeyValue("timeout", (Object)timeout).log("Timeout waiting for channel to be active");
            AmqpException timeoutError = new AmqpException(true, "Timeout waiting for channel to be active (" + timeout + ")", null);
            return channel.closeAsync().then(Mono.error((Throwable)((Object)timeoutError)));
        })).doOnCancel(() -> {
            logger.atInfo().log("The channel request was canceled while waiting to active.");
            if (!channel.isDisposed()) {
                channel.closeAsync().subscribe();
            }
        }).thenReturn((Object)channel);
    }

    private static final class RecoveryTerminatedException
    extends RuntimeException {
        private final String connectionId;
        private final String message;

        RecoveryTerminatedException(String connectionId, boolean isCacheTerminated, boolean isConnectionTerminated) {
            this.connectionId = connectionId;
            this.message = String.format("%s:%b %s:%b", RequestResponseChannelCache.IS_CACHE_TERMINATED_KEY, isCacheTerminated, RequestResponseChannelCache.IS_CONNECTION_TERMINATED_KEY, isConnectionTerminated);
        }

        RequestResponseChannelClosedException propagate() {
            return new RequestResponseChannelClosedException(this.connectionId, this.message);
        }
    }
}

