package com.azure.cosmos.implementation.directconnectivity.rntbd;

import com.azure.cosmos.implementation.IOpenConnectionsHandler;
import com.azure.cosmos.implementation.OpenConnectionResponse;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.directconnectivity.Uri;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdEndpoint;
import com.azure.cosmos.implementation.guava25.base.Preconditions;
import com.azure.cosmos.implementation.routing.RegionalRoutingContext;
import java.net.URI;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdOpenConnectionsHandler.class */
public class RntbdOpenConnectionsHandler implements IOpenConnectionsHandler {
    private static final Logger logger = LoggerFactory.getLogger(RntbdOpenConnectionsHandler.class);
    private final RntbdEndpoint.Provider endpointProvider;

    public RntbdOpenConnectionsHandler(RntbdEndpoint.Provider provider) {
        Preconditions.checkNotNull(provider, "Argument 'endpointProvider' can not be null");
        this.endpointProvider = provider;
    }

    @Override // com.azure.cosmos.implementation.IOpenConnectionsHandler
    public Flux<OpenConnectionResponse> openConnections(String str, List<RntbdEndpoint> list, int i) {
        Preconditions.checkNotNull(list, "Argument 'endpoints' should not be null");
        if (logger.isDebugEnabled()) {
            logger.debug("Open connections for endpoints {}", StringUtils.join(list, ","));
        }
        return Flux.fromIterable(list).flatMap(rntbdEndpoint -> {
            Uri addressUri = rntbdEndpoint.getAddressUri();
            return rntbdEndpoint.channelsMetrics() < i ? Mono.fromFuture(rntbdEndpoint.openConnection(new RntbdRequestArgs(getOpenConnectionRequest(str, rntbdEndpoint.serviceEndpoint()), addressUri))).onErrorResume(th -> {
                return Mono.just(new OpenConnectionResponse(addressUri, false, th, true, rntbdEndpoint.channelsMetrics()));
            }).doOnNext(openConnectionResponse -> {
                if (logger.isDebugEnabled()) {
                    logger.debug("Connection result: isConnected [{}], address [{}]", Boolean.valueOf(openConnectionResponse.isConnected()), openConnectionResponse.getUri());
                }
            }) : Mono.just(new OpenConnectionResponse(addressUri, true, null, false, rntbdEndpoint.channelsMetrics()));
        });
    }

    private RxDocumentServiceRequest getOpenConnectionRequest(String str, URI uri) {
        RxDocumentServiceRequest create = RxDocumentServiceRequest.create(null, OperationType.Create, ResourceType.Connection);
        create.requestContext.regionalRoutingContextToRoute = new RegionalRoutingContext(uri);
        create.requestContext.resolvedCollectionRid = str;
        create.faultInjectionRequestContext.setLocationEndpointToRoute(uri);
        return create;
    }
}
