package com.azure.core.amqp.implementation;

import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.exception.AzureException;
import com.azure.core.util.logging.ClientLogger;
import java.time.Duration;
import java.util.HashMap;
import java.util.Objects;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeoutException;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;

/* loaded from: input_file:com/azure/core/amqp/implementation/RequestResponseChannelCache.class */
public final class RequestResponseChannelCache implements Disposable {
    private static final String IS_CACHE_TERMINATED_KEY = "isCacheTerminated";
    private static final String IS_CONNECTION_TERMINATED_KEY = "isConnectionTerminated";
    private static final String TRY_COUNT_KEY = "tryCount";
    private final ClientLogger logger;
    private final ReactorConnection connection;
    private final Duration activationTimeout;
    private final Mono<RequestResponseChannel> createOrGetCachedChannel;
    private final Object lock = new Object();
    private volatile boolean terminated;
    private volatile RequestResponseChannel currentChannel;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/azure/core/amqp/implementation/RequestResponseChannelCache$RecoveryTerminatedException.class */
    public static final class RecoveryTerminatedException extends RuntimeException {
        private final String connectionId;
        private final String message;

        RecoveryTerminatedException(String str, boolean z, boolean z2) {
            this.connectionId = str;
            this.message = String.format("%s:%b %s:%b", RequestResponseChannelCache.IS_CACHE_TERMINATED_KEY, Boolean.valueOf(z), RequestResponseChannelCache.IS_CONNECTION_TERMINATED_KEY, Boolean.valueOf(z2));
        }

        RequestResponseChannelClosedException propagate() {
            return new RequestResponseChannelClosedException(this.connectionId, this.message);
        }
    }

    RequestResponseChannelCache(ReactorConnection reactorConnection, String str, String str2, String str3, AmqpRetryPolicy amqpRetryPolicy) {
        Objects.requireNonNull(reactorConnection, "'connection' cannot be null.");
        Objects.requireNonNull(str, "'entityPath' cannot be null.");
        Objects.requireNonNull(str2, "'sessionName' cannot be null.");
        Objects.requireNonNull(str3, "'linksName' cannot be null.");
        Objects.requireNonNull(amqpRetryPolicy, "'retryPolicy' cannot be null.");
        HashMap hashMap = new HashMap(2);
        hashMap.put(ClientConstants.CONNECTION_ID_KEY, reactorConnection.getId());
        hashMap.put(ClientConstants.LINK_NAME_KEY, str3);
        this.logger = new ClientLogger(RequestResponseChannelCache.class, hashMap);
        this.connection = reactorConnection;
        this.activationTimeout = amqpRetryPolicy.getRetryOptions().getTryTimeout();
        this.createOrGetCachedChannel = Mono.defer(() -> {
            RecoveryTerminatedException checkRecoveryTerminated = checkRecoveryTerminated("new-channel");
            return checkRecoveryTerminated != null ? Mono.error(checkRecoveryTerminated) : reactorConnection.newRequestResponseChannel(str2, str3, str);
        }).flatMap(requestResponseChannel -> {
            this.logger.atInfo().log("Waiting for channel to active.");
            return requestResponseChannel.getEndpointStates().filter(amqpEndpointState -> {
                return amqpEndpointState == AmqpEndpointState.ACTIVE;
            }).next().switchIfEmpty(Mono.error(() -> {
                return new AmqpException(true, "Channel completed without being active.", null);
            })).then(Mono.just(requestResponseChannel)).timeout(this.activationTimeout, Mono.defer(() -> {
                String format = String.format("The channel activation wait timed-out (%s).", this.activationTimeout);
                this.logger.atInfo().log(format + " Closing channel.");
                return requestResponseChannel.closeAsync().then(Mono.error(new AmqpException(true, format, null)));
            })).doOnCancel(() -> {
                this.logger.atInfo().log("The channel request was canceled while waiting to active.");
                if (requestResponseChannel.isDisposed()) {
                    return;
                }
                requestResponseChannel.closeAsync().subscribe();
            });
        }).retryWhen(retryWhenSpec(amqpRetryPolicy)).handle((requestResponseChannel2, synchronousSink) -> {
            RecoveryTerminatedException checkRecoveryTerminated;
            synchronized (this.lock) {
                checkRecoveryTerminated = checkRecoveryTerminated("cache-refresh");
                this.currentChannel = requestResponseChannel2;
            }
            if (checkRecoveryTerminated == null) {
                this.logger.atInfo().log("Emitting the new active channel.");
                synchronousSink.next(requestResponseChannel2);
            } else {
                if (!requestResponseChannel2.isDisposed()) {
                    requestResponseChannel2.closeAsync().subscribe();
                }
                synchronousSink.error(checkRecoveryTerminated.propagate());
            }
        }).cacheInvalidateIf(requestResponseChannel3 -> {
            if (!requestResponseChannel3.isDisposedOrDisposalInInProgress()) {
                return false;
            }
            this.logger.atInfo().log("The channel is closed, requesting a new channel.");
            return true;
        });
    }

    public Mono<RequestResponseChannel> get() {
        return this.createOrGetCachedChannel;
    }

    public void dispose() {
        synchronized (this.lock) {
            if (this.terminated) {
                return;
            }
            this.terminated = true;
            RequestResponseChannel requestResponseChannel = this.currentChannel;
            if (requestResponseChannel == null || requestResponseChannel.isDisposed()) {
                this.logger.atInfo().log("Terminating the channel recovery support.");
            } else {
                this.logger.atInfo().log("Closing the cached channel and Terminating the channel recovery support.");
                requestResponseChannel.closeAsync().subscribe();
            }
        }
    }

    public boolean isDisposed() {
        return this.terminated;
    }

    private Retry retryWhenSpec(AmqpRetryPolicy amqpRetryPolicy) {
        return Retry.from(flux -> {
            return flux.concatMap(retrySignal -> {
                Retry.RetrySignal copy = retrySignal.copy();
                AzureException failure = copy.failure();
                long j = copy.totalRetriesInARow();
                if (failure == null) {
                    return Mono.error(new IllegalStateException("RetrySignal::failure() not expected to be null."));
                }
                if (!((failure instanceof TimeoutException) || ((failure instanceof AmqpException) && ((AmqpException) failure).isTransient()) || (failure instanceof IllegalStateException) || (failure instanceof RejectedExecutionException))) {
                    this.logger.atWarning().addKeyValue(TRY_COUNT_KEY, j).log("Exception is non-retriable, not retrying for a new channel.", new Object[]{failure});
                    return failure instanceof RecoveryTerminatedException ? Mono.error(((RecoveryTerminatedException) failure).propagate()) : Mono.error(failure);
                }
                Duration calculateRetryDelay = amqpRetryPolicy.calculateRetryDelay(failure instanceof AmqpException ? failure : new AmqpException(true, "Non-AmqpException occurred upstream.", (Throwable) failure, (AmqpErrorContext) null), (int) Math.min(j, amqpRetryPolicy.getMaxRetries()));
                if (calculateRetryDelay == null) {
                    this.logger.atWarning().addKeyValue(TRY_COUNT_KEY, j).log("Retry is disabled, not retrying for a new channel.", new Object[]{failure});
                    return Mono.error(failure);
                }
                this.logger.atInfo().addKeyValue(TRY_COUNT_KEY, j).addKeyValue(ClientConstants.INTERVAL_KEY, calculateRetryDelay.toMillis()).log("Transient error occurred. Retrying.", new Object[]{failure});
                return Mono.delay(calculateRetryDelay);
            });
        });
    }

    private RecoveryTerminatedException checkRecoveryTerminated(String str) {
        boolean z = this.terminated;
        boolean isDisposed = this.connection.isDisposed();
        if (!z && !isDisposed) {
            return null;
        }
        this.logger.atInfo().addKeyValue(IS_CACHE_TERMINATED_KEY, z).addKeyValue(IS_CONNECTION_TERMINATED_KEY, isDisposed).log("Channel recovery support is terminated. call-site:{}", new Object[]{str});
        return new RecoveryTerminatedException(this.connection.getId(), z, isDisposed);
    }
}
