/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.implementation.throughputControl.sdk;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.RequestRateTooLargeException;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.RxDocumentServiceResponse;
import com.azure.cosmos.implementation.UUIDs;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.directconnectivity.StoreResponse;
import com.azure.cosmos.implementation.throughputControl.sdk.ThroughputControlTrackingUnit;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Exceptions;
import reactor.core.publisher.Mono;

public class ThroughputRequestThrottler {
    private static final Logger logger = LoggerFactory.getLogger(ThroughputRequestThrottler.class);
    private final AtomicReference<Double> availableThroughput;
    private final AtomicReference<Double> scheduledThroughput;
    private final ReentrantReadWriteLock.WriteLock throughputWriteLock;
    private final ReentrantReadWriteLock.ReadLock throughputReadLock;
    private final ConcurrentHashMap<OperationType, ThroughputControlTrackingUnit> trackingDictionary;
    private final String pkRangeId;
    private String cycleId;

    public ThroughputRequestThrottler(double scheduledThroughput, String pkRangeId) {
        this.availableThroughput = new AtomicReference<Double>(scheduledThroughput);
        this.scheduledThroughput = new AtomicReference<Double>(scheduledThroughput);
        ReentrantReadWriteLock throughputReadWriteLock = new ReentrantReadWriteLock();
        this.throughputWriteLock = throughputReadWriteLock.writeLock();
        this.throughputReadLock = throughputReadWriteLock.readLock();
        this.trackingDictionary = new ConcurrentHashMap();
        this.cycleId = UUIDs.nonBlockingRandomUUID().toString();
        this.pkRangeId = pkRangeId;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public double renewThroughputUsageCycle(double scheduledThroughput) {
        try {
            this.throughputWriteLock.lock();
            double throughputUsagePercentage = (this.scheduledThroughput.get() - this.availableThroughput.get()) / this.scheduledThroughput.get();
            this.scheduledThroughput.set(scheduledThroughput);
            this.updateAvailableThroughput();
            if (throughputUsagePercentage > 0.0) {
                logger.debug("[CycleId: {}, pkRangeId: {}, ruUsagePercentage: {}]", new Object[]{this.cycleId, this.pkRangeId, throughputUsagePercentage});
            }
            String newCycleId = UUIDs.nonBlockingRandomUUID().toString();
            for (ThroughputControlTrackingUnit trackingUnit : this.trackingDictionary.values()) {
                trackingUnit.reset(newCycleId);
            }
            this.cycleId = newCycleId;
            double d = throughputUsagePercentage;
            return d;
        }
        finally {
            this.throughputWriteLock.unlock();
        }
    }

    private void updateAvailableThroughput() {
        this.availableThroughput.getAndAccumulate(this.scheduledThroughput.get(), (available, refill) -> Math.min(available, 0.0) + refill);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public <T> Mono<T> processRequest(RxDocumentServiceRequest request, Mono<T> originalRequestMono) {
        try {
            this.throughputReadLock.lock();
            ThroughputControlTrackingUnit trackingUnit = this.trackingDictionary.compute(request.getOperationType(), (key, value) -> {
                if (value == null) {
                    value = new ThroughputControlTrackingUnit(request.getOperationType(), this.cycleId);
                }
                return value;
            });
            if (this.availableThroughput.get() > 0.0) {
                if (StringUtils.isEmpty(request.requestContext.throughputControlRequestContext.getThroughputControlCycleId())) {
                    request.requestContext.throughputControlRequestContext.setThroughputControlCycleId(this.cycleId);
                }
                trackingUnit.increasePassedRequest();
                Mono mono = originalRequestMono.doOnSuccess(response -> this.trackRequestCharge(request, response)).doOnError(throwable -> this.trackRequestCharge(request, throwable));
                return mono;
            }
            trackingUnit.increaseRejectedRequest();
            RequestRateTooLargeException requestRateTooLargeException = new RequestRateTooLargeException();
            int backoffTimeInMilliSeconds = (int)Math.ceil(Math.abs(this.availableThroughput.get() / this.scheduledThroughput.get())) * 1000;
            requestRateTooLargeException.getResponseHeaders().put("x-ms-retry-after-ms", String.valueOf(backoffTimeInMilliSeconds));
            if (ThroughputRequestThrottler.isBulkRequest(request)) {
                requestRateTooLargeException.getResponseHeaders().put("x-ms-substatus", String.valueOf(10005));
            } else {
                requestRateTooLargeException.getResponseHeaders().put("x-ms-substatus", String.valueOf(10003));
            }
            if (request.requestContext != null) {
                BridgeInternal.setResourceAddress(requestRateTooLargeException, request.requestContext.resourcePhysicalAddress);
            }
            Mono mono = Mono.error((Throwable)((Object)requestRateTooLargeException));
            return mono;
        }
        finally {
            this.throughputReadLock.unlock();
        }
    }

    private static boolean isBulkRequest(RxDocumentServiceRequest request) {
        if (request.getOperationType() != OperationType.Batch || request.getResourceType() != ResourceType.Document) {
            return false;
        }
        String isAtomicBatch = request.getHeaders().get("x-ms-cosmos-batch-atomic");
        if (StringUtils.isEmpty(isAtomicBatch)) {
            return true;
        }
        return !isAtomicBatch.equalsIgnoreCase(Boolean.TRUE.toString());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T> void trackRequestCharge(RxDocumentServiceRequest request, T response) {
        try {
            CosmosException cosmosException;
            this.throughputReadLock.lock();
            double requestCharge = 0.0;
            boolean failedRequest = false;
            if (response instanceof StoreResponse) {
                requestCharge = ((StoreResponse)response).getRequestCharge();
            } else if (response instanceof RxDocumentServiceResponse) {
                requestCharge = ((RxDocumentServiceResponse)response).getRequestCharge();
            } else if (response instanceof Throwable && (cosmosException = Utils.as(Exceptions.unwrap((Throwable)((Throwable)response)), CosmosException.class)) != null) {
                requestCharge = cosmosException.getRequestCharge();
                failedRequest = true;
            }
            ThroughputControlTrackingUnit trackingUnit = this.trackingDictionary.get((Object)request.getOperationType());
            if (trackingUnit != null) {
                if (failedRequest) {
                    trackingUnit.increaseFailedResponse();
                } else {
                    trackingUnit.increaseSuccessResponse();
                    trackingUnit.trackRRuUsage(requestCharge);
                }
            }
            if (StringUtils.equals(this.cycleId, request.requestContext.throughputControlRequestContext.getThroughputControlCycleId())) {
                this.availableThroughput.getAndAccumulate(requestCharge, (available, consumed) -> available - consumed);
            } else if (trackingUnit != null) {
                trackingUnit.increaseOutOfCycleResponse();
            }
        }
        finally {
            this.throughputReadLock.unlock();
        }
    }

    public double getAvailableThroughput() {
        return this.availableThroughput.get();
    }

    public double getScheduledThroughput() {
        return this.scheduledThroughput.get();
    }
}

