/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.implementation.throughputControl.controller.container;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.ConnectionMode;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosBridgeInternal;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.AsyncDocumentClient;
import com.azure.cosmos.implementation.CosmosSchedulers;
import com.azure.cosmos.implementation.Offer;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.caches.AsyncCache;
import com.azure.cosmos.implementation.caches.RxCollectionCache;
import com.azure.cosmos.implementation.caches.RxPartitionKeyRangeCache;
import com.azure.cosmos.implementation.guava25.base.Preconditions;
import com.azure.cosmos.implementation.throughputControl.LinkedCancellationToken;
import com.azure.cosmos.implementation.throughputControl.LinkedCancellationTokenSource;
import com.azure.cosmos.implementation.throughputControl.config.ThroughputControlGroupInternal;
import com.azure.cosmos.implementation.throughputControl.controller.container.IThroughputContainerController;
import com.azure.cosmos.implementation.throughputControl.controller.container.ThroughputProvisioningScope;
import com.azure.cosmos.implementation.throughputControl.controller.group.ThroughputGroupControllerBase;
import com.azure.cosmos.implementation.throughputControl.controller.group.ThroughputGroupControllerFactory;
import com.azure.cosmos.implementation.throughputControl.exceptions.ThroughputControlInitializationException;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.models.ThroughputProperties;
import com.azure.cosmos.models.ThroughputResponse;
import java.time.Duration;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.util.retry.Retry;
import reactor.util.retry.RetrySpec;

public class ThroughputContainerController
implements IThroughputContainerController {
    private static final Logger logger = LoggerFactory.getLogger(ThroughputContainerController.class);
    private static final Duration DEFAULT_THROUGHPUT_REFRESH_INTERVAL = Duration.ofMinutes(15L);
    private static final int NO_OFFER_EXCEPTION_STATUS_CODE = 400;
    private static final int NO_OFFER_EXCEPTION_SUB_STATUS_CODE = 10004;
    private final AsyncDocumentClient client;
    private final RxCollectionCache collectionCache;
    private final ConnectionMode connectionMode;
    private final AsyncCache<String, ThroughputGroupControllerBase> groupControllerCache;
    private final Set<ThroughputControlGroupInternal> groups;
    private final AtomicReference<Integer> maxContainerThroughput;
    private final RxPartitionKeyRangeCache partitionKeyRangeCache;
    private final CosmosAsyncContainer targetContainer;
    private final LinkedCancellationTokenSource cancellationTokenSource;
    private final ConcurrentHashMap<String, LinkedCancellationToken> cancellationTokenMap;
    private ThroughputGroupControllerBase defaultGroupController;
    private String targetContainerRid;
    private String targetDatabaseRid;
    private ThroughputProvisioningScope throughputProvisioningScope;

    public ThroughputContainerController(RxCollectionCache collectionCache, ConnectionMode connectionMode, Set<ThroughputControlGroupInternal> groups, RxPartitionKeyRangeCache partitionKeyRangeCache, LinkedCancellationToken parentToken) {
        Preconditions.checkNotNull(collectionCache, "Collection cache can not be null");
        Preconditions.checkArgument(groups != null && groups.size() > 0, "Throughput budget groups can not be null or empty");
        Preconditions.checkNotNull(partitionKeyRangeCache, "RxPartitionKeyRangeCache can not be null");
        this.collectionCache = collectionCache;
        this.connectionMode = connectionMode;
        this.groupControllerCache = new AsyncCache();
        this.groups = groups;
        this.maxContainerThroughput = new AtomicReference();
        this.partitionKeyRangeCache = partitionKeyRangeCache;
        this.targetContainer = groups.iterator().next().getTargetContainer();
        this.client = CosmosBridgeInternal.getContextClient(this.targetContainer);
        this.throughputProvisioningScope = this.getThroughputResolveLevel(groups);
        this.cancellationTokenSource = new LinkedCancellationTokenSource(parentToken);
        this.cancellationTokenMap = new ConcurrentHashMap();
    }

    private ThroughputProvisioningScope getThroughputResolveLevel(Set<ThroughputControlGroupInternal> groupConfigs) {
        if (groupConfigs.stream().anyMatch(groupConfig -> groupConfig.getTargetThroughputThreshold() != null)) {
            return ThroughputProvisioningScope.CONTAINER;
        }
        return ThroughputProvisioningScope.NONE;
    }

    @Override
    public <T> Mono<T> init() {
        return this.resolveContainerResourceId().flatMap(containerRid -> this.resolveContainerMaxThroughput()).flatMap(controller -> this.createAndInitializeGroupControllers()).doOnSuccess(controller -> CosmosSchedulers.COSMOS_PARALLEL.schedule(() -> this.refreshContainerMaxThroughputTask(this.cancellationTokenSource.getToken()).subscribe())).thenReturn((Object)this);
    }

    private Mono<String> resolveDatabaseResourceId() {
        return this.targetContainer.getDatabase().read().flatMap(response -> {
            this.targetDatabaseRid = response.getProperties().getResourceId();
            return Mono.just((Object)this.targetDatabaseRid);
        });
    }

    private Mono<String> resolveContainerResourceId() {
        return this.targetContainer.read().flatMap(response -> {
            this.targetContainerRid = response.getProperties().getResourceId();
            return Mono.just((Object)this.targetContainerRid);
        });
    }

    private Mono<ThroughputResponse> resolveDatabaseThroughput() {
        return Mono.justOrEmpty((Object)this.targetDatabaseRid).switchIfEmpty(this.resolveDatabaseResourceId()).flatMap(databaseRid -> this.resolveThroughputByResourceId((String)databaseRid));
    }

    private Mono<ThroughputResponse> resolveContainerThroughput() {
        if (StringUtils.isEmpty(this.targetContainerRid)) {
            return this.resolveContainerResourceId().flatMap(containerRid -> this.resolveThroughputByResourceId((String)containerRid)).onErrorResume(throwable -> {
                if (this.isOwnerResourceNotExistsException((Throwable)throwable)) {
                    this.collectionCache.refresh(null, BridgeInternal.getLink(this.targetContainer), null);
                }
                return Mono.error((Throwable)throwable);
            }).retryWhen((Retry)RetrySpec.max((long)1L).filter(throwable -> this.isOwnerResourceNotExistsException((Throwable)throwable)));
        }
        return Mono.just((Object)this.targetContainerRid).flatMap(containerRid -> this.resolveThroughputByResourceId((String)containerRid));
    }

    private Mono<ThroughputContainerController> resolveContainerMaxThroughput() {
        return Mono.defer(() -> Mono.just((Object)((Object)this.throughputProvisioningScope))).flatMap(throughputProvisioningScope -> {
            if (throughputProvisioningScope == ThroughputProvisioningScope.CONTAINER) {
                return this.resolveContainerThroughput().onErrorResume(throwable -> {
                    if (this.isOfferNotConfiguredException((Throwable)throwable)) {
                        this.throughputProvisioningScope = ThroughputProvisioningScope.DATABASE;
                    }
                    return Mono.error((Throwable)throwable);
                });
            }
            if (throughputProvisioningScope == ThroughputProvisioningScope.DATABASE) {
                return this.resolveDatabaseThroughput().onErrorResume(throwable -> {
                    if (this.isOfferNotConfiguredException((Throwable)throwable)) {
                        this.throughputProvisioningScope = ThroughputProvisioningScope.CONTAINER;
                    }
                    return Mono.error((Throwable)throwable);
                });
            }
            return Mono.empty();
        }).flatMap(throughputResponse -> {
            this.updateMaxContainerThroughput((ThroughputResponse)throughputResponse);
            return Mono.empty();
        }).onErrorResume(throwable -> {
            if (this.isOwnerResourceNotExistsException((Throwable)throwable)) {
                this.cancellationTokenSource.close();
            }
            return Mono.error((Throwable)throwable);
        }).retryWhen((Retry)RetrySpec.max((long)1L).filter(throwable -> this.isOfferNotConfiguredException((Throwable)throwable))).thenReturn((Object)this);
    }

    private Mono<ThroughputResponse> resolveThroughputByResourceId(String resourceId) {
        Preconditions.checkArgument(StringUtils.isNotEmpty(resourceId), "ResourceId can not be null or empty");
        return this.client.queryOffers(BridgeInternal.getOfferQuerySpecFromResourceId(this.targetContainer, resourceId), new CosmosQueryRequestOptions()).single().flatMap(offerFeedResponse -> {
            if (offerFeedResponse.getResults().isEmpty()) {
                CosmosException noOfferException = BridgeInternal.createCosmosException(400, "No offers found for the resource " + resourceId);
                BridgeInternal.setSubStatusCode(noOfferException, 10004);
                return Mono.error((Throwable)((Object)noOfferException));
            }
            return this.client.readOffer(((Offer)offerFeedResponse.getResults().get(0)).getSelfLink()).single();
        }).map(ModelBridgeInternal::createThroughputRespose);
    }

    private void updateMaxContainerThroughput(ThroughputResponse throughputResponse) {
        Preconditions.checkNotNull(throughputResponse, "Throughput response can not be null");
        ThroughputProperties throughputProperties = throughputResponse.getProperties();
        this.maxContainerThroughput.set(Math.max(throughputProperties.getAutoscaleMaxThroughput(), throughputProperties.getManualThroughput()));
    }

    private boolean isOfferNotConfiguredException(Throwable throwable) {
        Preconditions.checkNotNull(throwable, "Throwable should not be null");
        CosmosException cosmosException = Utils.as(Exceptions.unwrap((Throwable)throwable), CosmosException.class);
        return cosmosException != null && cosmosException.getStatusCode() == 400 && cosmosException.getSubStatusCode() == 10004;
    }

    private boolean isOwnerResourceNotExistsException(Throwable throwable) {
        Preconditions.checkNotNull(throwable, "Throwable should not be null");
        CosmosException cosmosException = Utils.as(Exceptions.unwrap((Throwable)throwable), CosmosException.class);
        return cosmosException != null && cosmosException.getStatusCode() == 404 && cosmosException.getSubStatusCode() == 1003;
    }

    @Override
    public <T> Mono<T> processRequest(RxDocumentServiceRequest request, Mono<T> originalRequestMono) {
        Preconditions.checkNotNull(request, "Request can not be null");
        Preconditions.checkNotNull(originalRequestMono, "Original request mono can not be null");
        return this.getOrCreateThroughputGroupController(request.getThroughputControlGroupName()).flatMap(groupController -> {
            if (groupController.v != null) {
                return ((ThroughputGroupControllerBase)groupController.v).processRequest(request, originalRequestMono);
            }
            return originalRequestMono;
        });
    }

    private Mono<Utils.ValueHolder<ThroughputGroupControllerBase>> getOrCreateThroughputGroupController(String groupName) {
        if (StringUtils.isEmpty(groupName)) {
            return Mono.just(new Utils.ValueHolder<ThroughputGroupControllerBase>(this.defaultGroupController));
        }
        for (ThroughputControlGroupInternal group : this.groups) {
            if (!StringUtils.equals(groupName, group.getGroupName())) continue;
            return this.resolveThroughputGroupController(group).map(Utils.ValueHolder::new);
        }
        return Mono.just(new Utils.ValueHolder<ThroughputGroupControllerBase>(this.defaultGroupController));
    }

    public String getTargetContainerRid() {
        return this.targetContainerRid;
    }

    @Override
    public boolean canHandleRequest(RxDocumentServiceRequest request) {
        Preconditions.checkNotNull(request, "Request can not be null");
        return StringUtils.equals(this.targetContainerRid, request.requestContext.resolvedCollectionRid);
    }

    private Mono<ThroughputContainerController> createAndInitializeGroupControllers() {
        return Flux.fromIterable(this.groups).flatMap(group -> this.resolveThroughputGroupController((ThroughputControlGroupInternal)group)).then(Mono.just((Object)this));
    }

    private Mono<ThroughputGroupControllerBase> resolveThroughputGroupController(ThroughputControlGroupInternal group) {
        return this.groupControllerCache.getAsync(group.getGroupName(), null, () -> this.createAndInitializeGroupController(group)).onErrorResume(throwable -> Mono.error((Throwable)new ThroughputControlInitializationException((Throwable)throwable)));
    }

    private Mono<ThroughputGroupControllerBase> createAndInitializeGroupController(ThroughputControlGroupInternal group) {
        LinkedCancellationToken parentToken = this.cancellationTokenMap.compute(group.getGroupName(), (key, cancellationToken) -> this.cancellationTokenSource.getToken());
        ThroughputGroupControllerBase groupController = ThroughputGroupControllerFactory.createController(this.connectionMode, group, this.maxContainerThroughput.get(), this.partitionKeyRangeCache, this.targetContainerRid, parentToken);
        return groupController.init().cast(ThroughputGroupControllerBase.class).doOnSuccess(controller -> {
            if (controller.isDefault()) {
                this.defaultGroupController = controller;
            }
        });
    }

    private Flux<Void> refreshContainerMaxThroughputTask(LinkedCancellationToken cancellationToken) {
        Preconditions.checkNotNull(cancellationToken, "Cancellation token can not be null");
        if (this.throughputProvisioningScope == ThroughputProvisioningScope.NONE) {
            return Flux.empty();
        }
        return Mono.delay((Duration)DEFAULT_THROUGHPUT_REFRESH_INTERVAL, (Scheduler)CosmosSchedulers.COSMOS_PARALLEL).flatMap(t -> {
            if (cancellationToken.isCancellationRequested()) {
                return Mono.empty();
            }
            return this.resolveContainerMaxThroughput();
        }).flatMapIterable(controller -> this.groups).flatMap(group -> this.resolveThroughputGroupController((ThroughputControlGroupInternal)group)).doOnNext(groupController -> groupController.onContainerMaxThroughputRefresh(this.maxContainerThroughput.get())).onErrorResume(throwable -> {
            logger.warn("Refresh throughput failed with reason %s", throwable);
            return Mono.empty();
        }).then().repeat(() -> !cancellationToken.isCancellationRequested());
    }
}

