package com.azure.cosmos.implementation.directconnectivity.rntbd;

import com.azure.cosmos.implementation.Configs;
import com.azure.cosmos.implementation.CosmosSchedulers;
import com.azure.cosmos.implementation.IOpenConnectionsHandler;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.OpenConnectionResponse;
import com.azure.cosmos.implementation.ShouldRetryResult;
import com.azure.cosmos.implementation.directconnectivity.AddressSelector;
import com.azure.cosmos.implementation.directconnectivity.Uri;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdEndpoint;
import com.azure.cosmos.implementation.guava27.Strings;
import com.azure.cosmos.models.CosmosContainerIdentity;
import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;

/* loaded from: input_file:com/azure/cosmos/implementation/directconnectivity/rntbd/ProactiveOpenConnectionsProcessor.class */
public final class ProactiveOpenConnectionsProcessor implements Closeable {
    private Sinks.Many<OpenConnectionTask> openConnectionsTaskSink;
    private final ReentrantReadWriteLock.WriteLock endpointsUnderMonitorMapWriteLock;
    private final ReentrantReadWriteLock.ReadLock endpointsUnderMonitorMapReadLock;
    private final Set<String> containersUnderOpenConnectionAndInitCaches;
    private final Map<String, Set<String>> collectionRidsAndUrisUnderOpenConnectionAndInitCaches;
    private final Set<String> addressUrisUnderOpenConnectionsAndInitCaches;
    private final Object containersUnderOpenConnectionAndInitCachesLock;
    private final IOpenConnectionsHandler openConnectionsHandler;
    private final RntbdEndpoint.Provider endpointProvider;
    private final AddressSelector addressSelector;
    private Disposable openConnectionBackgroundTask;
    private final Sinks.EmitFailureHandler serializedEmitFailureHandler;
    private static final int OPEN_CONNECTION_SINK_BUFFER_SIZE = 100000;
    private static final Logger logger = LoggerFactory.getLogger(ProactiveOpenConnectionsProcessor.class);
    private static final Map<ConnectionOpenFlowAggressivenessHint, ConcurrencyConfiguration> concurrencySettings = new HashMap();
    private final AtomicReference<Boolean> isClosed = new AtomicReference<>(false);
    private final AtomicReference<ConnectionOpenFlowAggressivenessHint> aggressivenessHint = new AtomicReference<>(ConnectionOpenFlowAggressivenessHint.DEFENSIVE);
    private final ConcurrentHashMap<String, List<OpenConnectionTask>> endpointsUnderMonitorMap = new ConcurrentHashMap<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/azure/cosmos/implementation/directconnectivity/rntbd/ProactiveOpenConnectionsProcessor$ConcurrencyConfiguration.class */
    public static class ConcurrencyConfiguration {
        final int openConnectionTaskEmissionConcurrency;
        final int openConnectionExecutionConcurrency;

        public ConcurrencyConfiguration(int i, int i2) {
            this.openConnectionTaskEmissionConcurrency = i;
            this.openConnectionExecutionConcurrency = i2;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/azure/cosmos/implementation/directconnectivity/rntbd/ProactiveOpenConnectionsProcessor$ConnectionOpenFlowAggressivenessHint.class */
    public enum ConnectionOpenFlowAggressivenessHint {
        AGGRESSIVE,
        DEFENSIVE
    }

    /* loaded from: input_file:com/azure/cosmos/implementation/directconnectivity/rntbd/ProactiveOpenConnectionsProcessor$SerializedEmitFailureHandler.class */
    private static class SerializedEmitFailureHandler implements Sinks.EmitFailureHandler {
        private SerializedEmitFailureHandler() {
        }

        public boolean onEmitFailure(SignalType signalType, Sinks.EmitResult emitResult) {
            if (emitResult.equals(Sinks.EmitResult.FAIL_NON_SERIALIZED) || emitResult.equals(Sinks.EmitResult.FAIL_OVERFLOW)) {
                ProactiveOpenConnectionsProcessor.logger.debug("SerializedEmitFailureHandler.onEmitFailure - Signal:{}, Result: {}", signalType, emitResult);
                return true;
            }
            ProactiveOpenConnectionsProcessor.logger.debug("SerializedEmitFailureHandler.onEmitFailure - Signal:{}, Result: {}", signalType, emitResult);
            return false;
        }
    }

    public ProactiveOpenConnectionsProcessor(RntbdEndpoint.Provider provider, AddressSelector addressSelector) {
        ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
        this.endpointsUnderMonitorMapWriteLock = reentrantReadWriteLock.writeLock();
        this.endpointsUnderMonitorMapReadLock = reentrantReadWriteLock.readLock();
        this.openConnectionsHandler = new RntbdOpenConnectionsHandler(provider);
        this.endpointProvider = provider;
        this.addressSelector = addressSelector;
        this.serializedEmitFailureHandler = new SerializedEmitFailureHandler();
        this.containersUnderOpenConnectionAndInitCaches = ConcurrentHashMap.newKeySet();
        this.collectionRidsAndUrisUnderOpenConnectionAndInitCaches = new ConcurrentHashMap();
        this.addressUrisUnderOpenConnectionsAndInitCaches = ConcurrentHashMap.newKeySet();
        this.containersUnderOpenConnectionAndInitCachesLock = new Object();
        concurrencySettings.put(ConnectionOpenFlowAggressivenessHint.AGGRESSIVE, new ConcurrencyConfiguration(Configs.getAggressiveWarmupConcurrency(), Configs.getAggressiveWarmupConcurrency()));
        concurrencySettings.put(ConnectionOpenFlowAggressivenessHint.DEFENSIVE, new ConcurrencyConfiguration(Configs.getOpenConnectionsConcurrency(), Configs.getOpenConnectionsConcurrency()));
    }

    public void init() {
        this.openConnectionBackgroundTask = getBackgroundOpenConnectionsPublisher();
    }

    public OpenConnectionTask submitOpenConnectionTaskOutsideLoop(String str, URI uri, Uri uri2, int i) {
        OpenConnectionTask openConnectionTask = new OpenConnectionTask(str, uri, uri2, i);
        submitOpenConnectionTaskOutsideLoopInternal(openConnectionTask);
        return openConnectionTask;
    }

    private void submitOpenConnectionTaskOutsideLoopInternal(OpenConnectionTask openConnectionTask) {
        String uRIAsString = openConnectionTask.getAddressUri().getURIAsString();
        if (this.endpointProvider.isClosed() || this.isClosed.get().booleanValue()) {
            openConnectionTask.completeExceptionally(new ClosedClientTransportException(Strings.lenientFormat("%s is closed", this), null));
            return;
        }
        this.endpointsUnderMonitorMapReadLock.lock();
        try {
            this.endpointsUnderMonitorMap.compute(uRIAsString, (str, list) -> {
                if (list == null) {
                    list = new ArrayList();
                }
                list.add(openConnectionTask);
                if (list.size() == 1) {
                    submitOpenConnectionWithinLoopInternal(openConnectionTask);
                }
                return list;
            });
            getOrCreateEndpoint(openConnectionTask);
        } finally {
            this.endpointsUnderMonitorMapReadLock.unlock();
        }
    }

    private synchronized void submitOpenConnectionWithinLoopInternal(OpenConnectionTask openConnectionTask) {
        this.openConnectionsTaskSink.emitNext(openConnectionTask, this.serializedEmitFailureHandler);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.isClosed.compareAndSet(false, true)) {
            logger.info("Shutting down ProactiveOpenConnectionsProcessor...");
            completeSink(this.openConnectionsTaskSink);
            this.endpointsUnderMonitorMap.forEach((str, list) -> {
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    ((OpenConnectionTask) it.next()).completeExceptionally(new ClosedClientTransportException(Strings.lenientFormat("%s is closed", this), null));
                }
            });
        }
    }

    public void recordOpenConnectionsAndInitCachesCompleted(List<CosmosContainerIdentity> list) {
        synchronized (this.containersUnderOpenConnectionAndInitCachesLock) {
            Iterator<CosmosContainerIdentity> it = list.iterator();
            while (it.hasNext()) {
                this.containersUnderOpenConnectionAndInitCaches.remove(ImplementationBridgeHelpers.CosmosContainerIdentityHelper.getCosmosContainerIdentityAccessor().getContainerLink(it.next()));
            }
            if (this.containersUnderOpenConnectionAndInitCaches.isEmpty()) {
                this.aggressivenessHint.set(ConnectionOpenFlowAggressivenessHint.DEFENSIVE);
                reInstantiateOpenConnectionsPublisherAndSubscribe(true);
            } else {
                logger.debug("Cannot switch to defensive mode as some of the containers are still under openConnectionAndInitCaches flow: [{}]", this.containersUnderOpenConnectionAndInitCaches);
            }
        }
    }

    public void recordOpenConnectionsAndInitCachesStarted(List<CosmosContainerIdentity> list) {
        synchronized (this.containersUnderOpenConnectionAndInitCachesLock) {
            boolean z = this.containersUnderOpenConnectionAndInitCaches.size() == 0;
            Iterator<CosmosContainerIdentity> it = list.iterator();
            while (it.hasNext()) {
                this.containersUnderOpenConnectionAndInitCaches.add(ImplementationBridgeHelpers.CosmosContainerIdentityHelper.getCosmosContainerIdentityAccessor().getContainerLink(it.next()));
            }
            if (z) {
                this.aggressivenessHint.set(ConnectionOpenFlowAggressivenessHint.AGGRESSIVE);
                reInstantiateOpenConnectionsPublisherAndSubscribe(false);
            }
        }
    }

    public void recordCollectionRidsAndUrisUnderOpenConnectionsAndInitCaches(String str, List<String> list) {
        this.collectionRidsAndUrisUnderOpenConnectionAndInitCaches.compute(str, (str2, set) -> {
            if (set == null) {
                set = new HashSet(list);
            } else {
                set.addAll(list);
            }
            this.addressUrisUnderOpenConnectionsAndInitCaches.addAll(list);
            return set;
        });
    }

    public boolean isAddressUriUnderOpenConnectionsFlow(String str) {
        return this.addressUrisUnderOpenConnectionsAndInitCaches.contains(str);
    }

    public boolean isCollectionRidUnderOpenConnectionsFlow(String str) {
        return this.collectionRidsAndUrisUnderOpenConnectionAndInitCaches.containsKey(str);
    }

    private Disposable getBackgroundOpenConnectionsPublisher() {
        ConcurrencyConfiguration concurrencyConfiguration = concurrencySettings.get(this.aggressivenessHint.get());
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        this.endpointsUnderMonitorMapWriteLock.lock();
        try {
            instantiateOpenConnectionsPublisher();
            concurrentHashMap.putAll(this.endpointsUnderMonitorMap);
            return Flux.from(this.openConnectionsTaskSink.asFlux()).mergeWith(Flux.fromIterable((Iterable) concurrentHashMap.keySet().stream().map(str -> {
                return (OpenConnectionTask) ((List) concurrentHashMap.get(str)).get(0);
            }).collect(Collectors.toList()))).publishOn(CosmosSchedulers.OPEN_CONNECTIONS_BOUNDED_ELASTIC).onErrorResume(th -> {
                logger.warn("An error occurred with proactiveOpenConnectionsProcessor, re-initializing open connections sink", th);
                reInstantiateOpenConnectionsPublisherAndSubscribe(false);
                return Mono.empty();
            }).parallel(concurrencyConfiguration.openConnectionTaskEmissionConcurrency).runOn(CosmosSchedulers.OPEN_CONNECTIONS_BOUNDED_ELASTIC).flatMap(openConnectionTask -> {
                return Flux.zip(Mono.just(openConnectionTask), this.openConnectionsHandler.openConnections(openConnectionTask.getCollectionRid(), Arrays.asList(getOrCreateEndpoint(openConnectionTask)), openConnectionTask.getMinConnectionsRequiredForEndpoint())).onErrorResume(th2 -> {
                    logger.warn("An error occurred in proactiveOpenConnectionsProcessor", th2);
                    return Flux.empty();
                });
            }, true, concurrencyConfiguration.openConnectionExecutionConcurrency).flatMap(tuple2 -> {
                OpenConnectionTask openConnectionTask2 = (OpenConnectionTask) tuple2.getT1();
                OpenConnectionResponse openConnectionResponse = (OpenConnectionResponse) tuple2.getT2();
                if (openConnectionResponse.isConnected() && openConnectionResponse.isOpenConnectionAttempted()) {
                    submitOpenConnectionWithinLoopInternal(openConnectionTask2);
                    return Mono.just(openConnectionResponse);
                }
                if (!openConnectionResponse.isConnected() || openConnectionResponse.isOpenConnectionAttempted()) {
                    return openConnectionTask2.getRetryPolicy().shouldRetry((Exception) openConnectionResponse.getException()).flatMap(shouldRetryResult -> {
                        if (shouldRetryResult.shouldRetry) {
                            return enqueueOpenConnectionTaskForRetry(openConnectionTask2, shouldRetryResult).onErrorResume(th2 -> {
                                logger.warn("An error occurred in proactiveOpenConnectionsProcessor", th2);
                                return Mono.empty();
                            });
                        }
                        removeEndpointFromMonitor(openConnectionTask2.getAddressUri().toString(), openConnectionResponse);
                        return Mono.just(openConnectionResponse);
                    });
                }
                removeEndpointFromMonitor(openConnectionTask2.getAddressUri().toString(), openConnectionResponse);
                return Mono.just(openConnectionResponse);
            }, true).subscribe();
        } finally {
            this.endpointsUnderMonitorMapWriteLock.unlock();
        }
    }

    private RntbdEndpoint getOrCreateEndpoint(OpenConnectionTask openConnectionTask) {
        RntbdEndpoint createIfAbsent = this.endpointProvider.createIfAbsent(openConnectionTask.getServiceEndpoint(), openConnectionTask.getAddressUri(), this, openConnectionTask.getMinConnectionsRequiredForEndpoint(), this.addressSelector);
        createIfAbsent.setMinChannelsRequired(Math.max(openConnectionTask.getMinConnectionsRequiredForEndpoint(), createIfAbsent.getMinChannelsRequired()));
        return createIfAbsent;
    }

    private void removeEndpointFromMonitor(String str, OpenConnectionResponse openConnectionResponse) {
        List<OpenConnectionTask> remove = this.endpointsUnderMonitorMap.remove(str);
        logger.debug("Open connections completed for endpoint : {}, no. of connections opened : {}", str, Integer.valueOf(openConnectionResponse.getOpenConnectionCountToEndpoint()));
        if (remove == null || remove.isEmpty()) {
            return;
        }
        Iterator<OpenConnectionTask> it = remove.iterator();
        while (it.hasNext()) {
            it.next().complete(openConnectionResponse);
        }
    }

    private synchronized void reInstantiateOpenConnectionsPublisherAndSubscribe(boolean z) {
        if (z) {
            logger.debug("Force defensive opening of connections");
            forceDefensiveOpenConnections();
        }
        if (this.openConnectionBackgroundTask != null) {
            this.openConnectionBackgroundTask.dispose();
        }
        this.openConnectionBackgroundTask = getBackgroundOpenConnectionsPublisher();
    }

    private Mono<OpenConnectionResponse> enqueueOpenConnectionTaskForRetry(OpenConnectionTask openConnectionTask, ShouldRetryResult shouldRetryResult) {
        if (shouldRetryResult.backOffTime != Duration.ZERO && shouldRetryResult.backOffTime != null) {
            return Mono.delay(shouldRetryResult.backOffTime).flatMap(l -> {
                submitOpenConnectionWithinLoopInternal(openConnectionTask);
                return Mono.empty();
            });
        }
        submitOpenConnectionWithinLoopInternal(openConnectionTask);
        return Mono.empty();
    }

    private void instantiateOpenConnectionsPublisher() {
        logger.debug("Re-instantiate open connections task sink");
        this.openConnectionsTaskSink = Sinks.many().multicast().onBackpressureBuffer(OPEN_CONNECTION_SINK_BUFFER_SIZE);
    }

    private void forceDefensiveOpenConnections() {
        if (this.aggressivenessHint.get() == ConnectionOpenFlowAggressivenessHint.AGGRESSIVE) {
            this.aggressivenessHint.set(ConnectionOpenFlowAggressivenessHint.DEFENSIVE);
        }
    }

    private void completeSink(Sinks.Many<OpenConnectionTask> many) {
        Sinks.EmitResult tryEmitComplete = many.tryEmitComplete();
        if (tryEmitComplete == Sinks.EmitResult.OK) {
            logger.debug("Sink completed.");
        } else if (tryEmitComplete == Sinks.EmitResult.FAIL_CANCELLED || tryEmitComplete == Sinks.EmitResult.FAIL_TERMINATED) {
            logger.debug("Sink already completed, EmitResult: {}", tryEmitComplete);
        } else {
            logger.warn("Sink completion failed, EmitResult: {}", tryEmitComplete);
        }
    }
}
