/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.implementation.throughputControl.sdk.controller.container;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.ConnectionMode;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosBridgeInternal;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.AsyncDocumentClient;
import com.azure.cosmos.implementation.CosmosPagedFluxOptions;
import com.azure.cosmos.implementation.CosmosSchedulers;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.Offer;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.QueryFeedOperationState;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.caches.AsyncCache;
import com.azure.cosmos.implementation.caches.RxCollectionCache;
import com.azure.cosmos.implementation.caches.RxPartitionKeyRangeCache;
import com.azure.cosmos.implementation.guava25.base.Preconditions;
import com.azure.cosmos.implementation.throughputControl.IThroughputContainerController;
import com.azure.cosmos.implementation.throughputControl.sdk.LinkedCancellationToken;
import com.azure.cosmos.implementation.throughputControl.sdk.LinkedCancellationTokenSource;
import com.azure.cosmos.implementation.throughputControl.sdk.config.SDKThroughputControlGroupInternal;
import com.azure.cosmos.implementation.throughputControl.sdk.controller.container.ThroughputProvisioningScope;
import com.azure.cosmos.implementation.throughputControl.sdk.controller.group.SDKThroughputGroupControllerBase;
import com.azure.cosmos.implementation.throughputControl.sdk.controller.group.ThroughputGroupControllerFactory;
import com.azure.cosmos.implementation.throughputControl.sdk.exceptions.ThroughputControlInitializationException;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.models.ThroughputProperties;
import com.azure.cosmos.models.ThroughputResponse;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.util.retry.Retry;
import reactor.util.retry.RetrySpec;

public class SDKThroughputContainerController
implements IThroughputContainerController {
    private static final Logger logger = LoggerFactory.getLogger(SDKThroughputContainerController.class);
    private static final Duration DEFAULT_THROUGHPUT_REFRESH_INTERVAL = Duration.ofMinutes(15L);
    private static final int NO_OFFER_EXCEPTION_STATUS_CODE = 400;
    private static final int NO_OFFER_EXCEPTION_SUB_STATUS_CODE = 10004;
    private final AsyncDocumentClient client;
    private final RxCollectionCache collectionCache;
    private final ConnectionMode connectionMode;
    private final AsyncCache<String, SDKThroughputGroupControllerBase> groupControllerCache;
    private final Map<String, SDKThroughputControlGroupInternal> groups;
    private final AtomicReference<Integer> maxContainerThroughput;
    private final RxPartitionKeyRangeCache partitionKeyRangeCache;
    private final CosmosAsyncContainer targetContainer;
    private final LinkedCancellationTokenSource cancellationTokenSource;
    private final ConcurrentHashMap<String, LinkedCancellationToken> cancellationTokenMap;
    private final Mono<Integer> throughputQueryMono;
    private SDKThroughputGroupControllerBase defaultGroupController;
    private String targetContainerRid;
    private String targetDatabaseRid;
    private ThroughputProvisioningScope throughputProvisioningScope;

    public SDKThroughputContainerController(RxCollectionCache collectionCache, ConnectionMode connectionMode, Map<String, SDKThroughputControlGroupInternal> groups, RxPartitionKeyRangeCache partitionKeyRangeCache, LinkedCancellationToken parentToken, Mono<Integer> throughputQueryMono) {
        Preconditions.checkNotNull(collectionCache, "Collection cache can not be null");
        Preconditions.checkArgument(groups != null && groups.size() > 0, "Throughput budget groups can not be null or empty");
        Preconditions.checkNotNull(partitionKeyRangeCache, "RxPartitionKeyRangeCache can not be null");
        this.collectionCache = collectionCache;
        this.connectionMode = connectionMode;
        this.groupControllerCache = new AsyncCache();
        this.groups = groups;
        this.maxContainerThroughput = new AtomicReference();
        this.partitionKeyRangeCache = partitionKeyRangeCache;
        this.targetContainer = groups.values().iterator().next().getTargetContainer();
        this.client = CosmosBridgeInternal.getContextClient(this.targetContainer);
        this.throughputProvisioningScope = this.getThroughputResolveLevel(groups);
        this.cancellationTokenSource = new LinkedCancellationTokenSource(parentToken);
        this.cancellationTokenMap = new ConcurrentHashMap();
        this.throughputQueryMono = throughputQueryMono == null ? this.resolveContainerMaxThroughputCore() : throughputQueryMono;
    }

    private ThroughputProvisioningScope getThroughputResolveLevel(Map<String, SDKThroughputControlGroupInternal> groupConfigs) {
        if (groupConfigs.values().stream().anyMatch(groupConfig -> groupConfig.getTargetThroughputThreshold() != null)) {
            return ThroughputProvisioningScope.CONTAINER;
        }
        return ThroughputProvisioningScope.NONE;
    }

    @Override
    public <T> Mono<T> init() {
        return this.resolveContainerResourceId().flatMap(containerRid -> this.resolveContainerMaxThroughput()).flatMap(controller -> this.createAndInitializeGroupControllers()).doOnSuccess(controller -> CosmosSchedulers.COSMOS_PARALLEL.schedule(() -> this.refreshContainerMaxThroughputTask(this.cancellationTokenSource.getToken()).subscribe())).thenReturn((Object)this);
    }

    private Mono<String> resolveDatabaseResourceId() {
        return this.targetContainer.getDatabase().read().flatMap(response -> {
            this.targetDatabaseRid = response.getProperties().getResourceId();
            return Mono.just((Object)this.targetDatabaseRid);
        });
    }

    private Mono<String> resolveContainerResourceId() {
        return this.targetContainer.read().flatMap(response -> {
            this.targetContainerRid = response.getProperties().getResourceId();
            return Mono.just((Object)this.targetContainerRid);
        });
    }

    private Mono<ThroughputResponse> resolveDatabaseThroughput() {
        return Mono.justOrEmpty((Object)this.targetDatabaseRid).switchIfEmpty(this.resolveDatabaseResourceId()).flatMap(this::resolveThroughputByResourceId);
    }

    private Mono<ThroughputResponse> resolveContainerThroughput() {
        if (StringUtils.isEmpty(this.targetContainerRid)) {
            return this.resolveContainerResourceId().flatMap(this::resolveThroughputByResourceId).onErrorResume(throwable -> {
                if (this.isOwnerResourceNotExistsException((Throwable)throwable)) {
                    this.collectionCache.refresh(null, BridgeInternal.getLink(this.targetContainer), null);
                }
                return Mono.error((Throwable)throwable);
            }).retryWhen((Retry)RetrySpec.max((long)1L).filter(this::isOwnerResourceNotExistsException));
        }
        return Mono.just((Object)this.targetContainerRid).flatMap(this::resolveThroughputByResourceId);
    }

    private Mono<SDKThroughputContainerController> resolveContainerMaxThroughput() {
        return this.throughputQueryMono.flatMap(maxThroughput -> {
            this.maxContainerThroughput.set((Integer)maxThroughput);
            return Mono.just((Object)this);
        }).switchIfEmpty(Mono.just((Object)this));
    }

    private Mono<Integer> resolveContainerMaxThroughputCore() {
        return Mono.defer(() -> Mono.just((Object)((Object)this.throughputProvisioningScope))).flatMap(throughputProvisioningScope -> {
            if (throughputProvisioningScope == ThroughputProvisioningScope.CONTAINER) {
                return this.resolveContainerThroughput().onErrorResume(throwable -> {
                    if (this.isOfferNotConfiguredException((Throwable)throwable)) {
                        this.throughputProvisioningScope = ThroughputProvisioningScope.DATABASE;
                    }
                    return Mono.error((Throwable)throwable);
                });
            }
            if (throughputProvisioningScope == ThroughputProvisioningScope.DATABASE) {
                return this.resolveDatabaseThroughput().onErrorResume(throwable -> {
                    if (this.isOfferNotConfiguredException((Throwable)throwable)) {
                        this.throughputProvisioningScope = ThroughputProvisioningScope.CONTAINER;
                    }
                    return Mono.error((Throwable)throwable);
                });
            }
            return Mono.empty();
        }).map(this::getMaxContainerThroughput).onErrorResume(throwable -> {
            if (this.isOwnerResourceNotExistsException((Throwable)throwable)) {
                this.cancellationTokenSource.close();
            }
            return Mono.error((Throwable)throwable);
        }).retryWhen((Retry)RetrySpec.max((long)1L).filter(this::isOfferNotConfiguredException));
    }

    private Mono<ThroughputResponse> resolveThroughputByResourceId(String resourceId) {
        Preconditions.checkArgument(StringUtils.isNotEmpty(resourceId), "ResourceId can not be null or empty");
        QueryFeedOperationState state = new QueryFeedOperationState(ImplementationBridgeHelpers.CosmosAsyncDatabaseHelper.getCosmosAsyncDatabaseAccessor().getCosmosAsyncClient(this.targetContainer.getDatabase()), "resolveThroughputByResourceId", this.targetContainer.getDatabase().getId(), this.targetContainer.getId(), ResourceType.Offer, OperationType.Query, null, new CosmosQueryRequestOptions(), new CosmosPagedFluxOptions());
        return this.client.queryOffers(BridgeInternal.getOfferQuerySpecFromResourceId(this.targetContainer, resourceId), state).single().flatMap(offerFeedResponse -> {
            if (offerFeedResponse.getResults().isEmpty()) {
                CosmosException noOfferException = BridgeInternal.createCosmosException(400, "No offers found for the resource " + resourceId);
                BridgeInternal.setSubStatusCode(noOfferException, 10004);
                return Mono.error((Throwable)((Object)noOfferException));
            }
            return this.client.readOffer(((Offer)offerFeedResponse.getResults().get(0)).getSelfLink()).single();
        }).map(ModelBridgeInternal::createThroughputRespose);
    }

    private Integer getMaxContainerThroughput(ThroughputResponse throughputResponse) {
        Preconditions.checkNotNull(throughputResponse, "Throughput response can not be null");
        ThroughputProperties throughputProperties = throughputResponse.getProperties();
        return Math.max(throughputProperties.getAutoscaleMaxThroughput(), throughputProperties.getManualThroughput());
    }

    private boolean isOfferNotConfiguredException(Throwable throwable) {
        Preconditions.checkNotNull(throwable, "Throwable should not be null");
        CosmosException cosmosException = Utils.as(Exceptions.unwrap((Throwable)throwable), CosmosException.class);
        return cosmosException != null && cosmosException.getStatusCode() == 400 && cosmosException.getSubStatusCode() == 10004;
    }

    private boolean isOwnerResourceNotExistsException(Throwable throwable) {
        Preconditions.checkNotNull(throwable, "Throwable should not be null");
        CosmosException cosmosException = Utils.as(Exceptions.unwrap((Throwable)throwable), CosmosException.class);
        return cosmosException != null && cosmosException.getStatusCode() == 404 && cosmosException.getSubStatusCode() == 1003;
    }

    @Override
    public <T> Mono<T> processRequest(RxDocumentServiceRequest request, Mono<T> originalRequestMono) {
        Preconditions.checkNotNull(request, "Request can not be null");
        Preconditions.checkNotNull(originalRequestMono, "Original request mono can not be null");
        return this.getOrCreateThroughputGroupController(request.getThroughputControlGroupName()).flatMap(groupController -> {
            if (groupController.v != null) {
                return ((SDKThroughputGroupControllerBase)groupController.v).processRequest(request, originalRequestMono);
            }
            return originalRequestMono;
        });
    }

    private Mono<Utils.ValueHolder<SDKThroughputGroupControllerBase>> getOrCreateThroughputGroupController(String groupName) {
        if (StringUtils.isEmpty(groupName)) {
            return Mono.just(new Utils.ValueHolder<SDKThroughputGroupControllerBase>(this.defaultGroupController));
        }
        SDKThroughputControlGroupInternal group = this.groups.get(groupName);
        if (group == null) {
            return Mono.just(new Utils.ValueHolder<SDKThroughputGroupControllerBase>(this.defaultGroupController));
        }
        return this.resolveThroughputGroupController(group).map(Utils.ValueHolder::new);
    }

    public String getTargetContainerRid() {
        return this.targetContainerRid;
    }

    @Override
    public boolean canHandleRequest(RxDocumentServiceRequest request) {
        Preconditions.checkNotNull(request, "Request can not be null");
        return StringUtils.equals(this.targetContainerRid, request.requestContext.resolvedCollectionRid);
    }

    private Mono<SDKThroughputContainerController> createAndInitializeGroupControllers() {
        return Flux.fromIterable(this.groups.values()).flatMap(this::resolveThroughputGroupController).then(Mono.just((Object)this));
    }

    private Mono<SDKThroughputGroupControllerBase> resolveThroughputGroupController(SDKThroughputControlGroupInternal group) {
        return this.groupControllerCache.getAsync(group.getGroupName(), null, () -> this.createAndInitializeGroupController(group)).onErrorResume(throwable -> Mono.error((Throwable)new ThroughputControlInitializationException((Throwable)throwable)));
    }

    private Mono<SDKThroughputGroupControllerBase> createAndInitializeGroupController(SDKThroughputControlGroupInternal group) {
        LinkedCancellationToken parentToken = this.cancellationTokenMap.compute(group.getGroupName(), (key, cancellationToken) -> this.cancellationTokenSource.getToken());
        SDKThroughputGroupControllerBase groupController = ThroughputGroupControllerFactory.createController(this.connectionMode, group, this.maxContainerThroughput.get(), this.partitionKeyRangeCache, this.targetContainerRid, parentToken);
        return groupController.init().cast(SDKThroughputGroupControllerBase.class).doOnSuccess(controller -> {
            if (controller.isDefault()) {
                this.defaultGroupController = controller;
            }
        });
    }

    private Flux<Void> refreshContainerMaxThroughputTask(LinkedCancellationToken cancellationToken) {
        Preconditions.checkNotNull(cancellationToken, "Cancellation token can not be null");
        if (this.throughputProvisioningScope == ThroughputProvisioningScope.NONE) {
            return Flux.empty();
        }
        return Mono.delay((Duration)DEFAULT_THROUGHPUT_REFRESH_INTERVAL, (Scheduler)CosmosSchedulers.COSMOS_PARALLEL).flatMap(t -> {
            if (cancellationToken.isCancellationRequested()) {
                return Mono.empty();
            }
            return this.resolveContainerMaxThroughput();
        }).flatMapIterable(controller -> this.groups.values()).flatMap(this::resolveThroughputGroupController).doOnNext(groupController -> groupController.onContainerMaxThroughputRefresh(this.maxContainerThroughput.get())).onErrorResume(throwable -> {
            logger.warn("Refresh throughput failed with reason {}", (Object)throwable.getMessage());
            return Mono.empty();
        }).then().repeat(() -> !cancellationToken.isCancellationRequested());
    }
}

