/*
 * Decompiled with CFR 0.152.
 */
package com.azure.core.amqp.implementation;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.ReactorConnection;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.logging.LoggingEventBuilder;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;

public final class ReactorConnectionCache<T extends ReactorConnection>
implements Disposable {
    private static final AmqpException TERMINATED_ERROR = new AmqpException(false, "Connection recovery support is terminated.", null);
    private static final String TRY_COUNT_KEY = "tryCount";
    private final String fullyQualifiedNamespace;
    private final String entityPath;
    private final AmqpRetryOptions retryOptions;
    private final AmqpErrorContext errorContext;
    private final ClientLogger logger;
    private final Mono<T> createOrGetCachedConnection;
    private final Object lock = new Object();
    private volatile boolean terminated;
    private volatile T currentConnection;
    private final State state = new State();

    public ReactorConnectionCache(Supplier<T> connectionSupplier, String fullyQualifiedNamespace, String entityPath, AmqpRetryPolicy retryPolicy, Map<String, Object> loggingContext) {
        this.fullyQualifiedNamespace = Objects.requireNonNull(fullyQualifiedNamespace, "'fullyQualifiedNamespace' cannot be null.");
        this.entityPath = entityPath;
        Objects.requireNonNull(retryPolicy, "'retryPolicy' cannot be null.");
        this.retryOptions = retryPolicy.getRetryOptions();
        this.errorContext = new AmqpErrorContext(fullyQualifiedNamespace);
        this.logger = new ClientLogger(this.getClass(), Objects.requireNonNull(loggingContext, "'loggingContext' cannot be null."));
        Objects.requireNonNull(connectionSupplier, "'connectionSupplier' cannot be null.");
        Mono newConnection = Mono.fromSupplier(() -> {
            if (this.terminated) {
                this.logger.info("Connection recovery support is terminated, dropping the request for new connection.");
                throw TERMINATED_ERROR;
            }
            return (ReactorConnection)connectionSupplier.get();
        });
        this.createOrGetCachedConnection = newConnection.flatMap(c -> {
            this.state.transfer((ReactorConnection)c);
            ReactorConnectionCache.withConnectionId(this.logger, c.getId()).log("Waiting to connect and active.");
            return c.connectAndAwaitToActive().doOnCancel(() -> {
                if (!c.isDisposed()) {
                    ReactorConnectionCache.closeConnection(c, this.logger, "Request was canceled while waiting to connect and active.");
                }
            });
        }).retryWhen(this.retryWhenSpec(retryPolicy)).handle((c, sink) -> {
            boolean terminated;
            ReactorConnection connection = c;
            Object object = this.lock;
            synchronized (object) {
                terminated = this.terminated;
                this.currentConnection = connection;
            }
            if (terminated) {
                ReactorConnectionCache.closeConnection(connection, this.logger, "Connection recovery support is terminated.");
                sink.error((Throwable)((Object)TERMINATED_ERROR));
            } else {
                ReactorConnectionCache.withConnectionId(this.logger, c.getId()).log("Emitting the new active connection.");
                sink.next((Object)connection);
            }
        }).cacheInvalidateIf(c -> {
            if (c.isDisposed()) {
                ReactorConnectionCache.withConnectionId(this.logger, c.getId()).log("The connection is closed, requesting a new connection.");
                return true;
            }
            return false;
        });
    }

    public Mono<T> get() {
        return this.createOrGetCachedConnection;
    }

    public String getFullyQualifiedNamespace() {
        return this.fullyQualifiedNamespace;
    }

    public String getEntityPath() {
        return this.entityPath;
    }

    public AmqpRetryOptions getRetryOptions() {
        return this.retryOptions;
    }

    public boolean isCurrentConnectionClosed() {
        return this.currentConnection != null && ((ReactorConnection)this.currentConnection).isDisposed() || this.terminated;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void dispose() {
        T connection;
        Object object = this.lock;
        synchronized (object) {
            if (this.terminated) {
                return;
            }
            this.terminated = true;
            connection = this.currentConnection;
        }
        if (connection != null && !((ReactorConnection)connection).isDisposed()) {
            ReactorConnectionCache.closeConnection(connection, this.logger, "Terminating the connection recovery support.");
        } else {
            this.logger.info("Terminating the connection recovery support.");
        }
    }

    public boolean isDisposed() {
        return this.terminated;
    }

    Retry retryWhenSpec(AmqpRetryPolicy retryPolicy) {
        return Retry.from(retrySignals -> retrySignals.concatMap(retrySignal -> {
            long attempts;
            boolean shouldRetry;
            Retry.RetrySignal signal = retrySignal.copy();
            Throwable error = signal.failure();
            long iteration = signal.totalRetriesInARow();
            if (error == null) {
                return Mono.error((Throwable)new IllegalStateException("RetrySignal::failure() not expected to be null."));
            }
            boolean bl = shouldRetry = error instanceof TimeoutException || error instanceof AmqpException && ((AmqpException)((Object)((Object)((Object)error)))).isTransient() || error instanceof IllegalStateException || error instanceof RejectedExecutionException;
            if (!shouldRetry) {
                this.logger.atError().addKeyValue(TRY_COUNT_KEY, iteration).log("Exception is non-retriable, not retrying for a new connection.", new Object[]{error});
                return Mono.error((Throwable)error);
            }
            Object errorToUse = error instanceof AmqpException ? error : new AmqpException(true, "Non-AmqpException occurred upstream.", error, this.errorContext);
            Duration backoff = retryPolicy.calculateRetryDelay((Throwable)errorToUse, (int)(attempts = Math.min(iteration, (long)retryPolicy.getMaxRetries())));
            if (backoff == null) {
                this.logger.atError().addKeyValue(TRY_COUNT_KEY, iteration).log("Retry is disabled, not retrying for a new connection.", new Object[]{error});
                return Mono.error((Throwable)error);
            }
            if (this.terminated) {
                return Mono.error((Throwable)((Object)TERMINATED_ERROR));
            }
            this.logger.atInfo().addKeyValue(TRY_COUNT_KEY, iteration).addKeyValue("intervalMs", backoff.toMillis()).log("Transient error occurred. Retrying.", new Object[]{error});
            return Mono.delay((Duration)backoff);
        }));
    }

    private static void closeConnection(ReactorConnection c, ClientLogger logger, String message) {
        LoggingEventBuilder builder = ReactorConnectionCache.withConnectionId(logger, c.getId());
        builder.log("closing connection (" + message + ").");
        c.closeAsync().subscribe(__ -> {}, t -> builder.log("connection close finished with error.", new Object[]{t}), () -> builder.log("connection close finished."));
    }

    private static LoggingEventBuilder withConnectionId(ClientLogger logger, String id) {
        return logger.atInfo().addKeyValue("connectionId", id);
    }

    private static final class State {
        private final AtomicReference<ReactorConnection> s = new AtomicReference<Object>(null);

        private State() {
        }

        void transfer(ReactorConnection c) {
            ReactorConnection from = this.s.getAndSet(c);
            if (from != null) {
                c.transferState(from);
            }
        }
    }
}

