/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.implementation.changefeed.fullfidelity;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.implementation.PartitionKeyRange;
import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient;
import com.azure.cosmos.implementation.changefeed.Lease;
import com.azure.cosmos.implementation.changefeed.LeaseContainer;
import com.azure.cosmos.implementation.changefeed.LeaseManager;
import com.azure.cosmos.implementation.changefeed.PartitionSynchronizer;
import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl;
import com.azure.cosmos.implementation.routing.Range;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

class PartitionSynchronizerImpl
implements PartitionSynchronizer {
    private final Logger logger = LoggerFactory.getLogger(PartitionSynchronizerImpl.class);
    private final ChangeFeedContextClient documentClient;
    private final CosmosAsyncContainer collectionSelfLink;
    private final LeaseContainer leaseContainer;
    private final LeaseManager leaseManager;
    private final int degreeOfParallelism;
    private final int maxBatchSize;
    private final String collectionResourceId;

    public PartitionSynchronizerImpl(ChangeFeedContextClient documentClient, CosmosAsyncContainer collectionSelfLink, LeaseContainer leaseContainer, LeaseManager leaseManager, int degreeOfParallelism, int maxBatchSize, String collectionResourceId) {
        this.documentClient = documentClient;
        this.collectionSelfLink = collectionSelfLink;
        this.leaseContainer = leaseContainer;
        this.leaseManager = leaseManager;
        this.degreeOfParallelism = degreeOfParallelism;
        this.maxBatchSize = maxBatchSize;
        this.collectionResourceId = collectionResourceId;
    }

    @Override
    public Mono<Void> createMissingLeases() {
        return this.enumPartitionKeyRanges().collectList().flatMap(pkRangeList -> this.createLeases((List<PartitionKeyRange>)pkRangeList).then()).onErrorResume(throwable -> {
            this.logger.error("Create lease failed", throwable);
            return Mono.empty();
        });
    }

    @Override
    public Flux<Lease> splitPartition(Lease lease) {
        if (lease == null) {
            throw new IllegalArgumentException("lease");
        }
        String leaseToken = lease.getLeaseToken();
        Range<String> epkRange = ((FeedRangeEpkImpl)lease.getFeedRange()).getRange();
        String lastContinuationToken = lease.getContinuationToken();
        this.logger.info("Partition {} with feed range {} is gone due to split; will attempt to resume using continuation token {}.", new Object[]{leaseToken, epkRange, lastContinuationToken});
        return this.enumPartitionKeyRanges().filter(pkRange -> !((String)epkRange.getMin()).equals(pkRange.getMinInclusive()) && !((String)epkRange.getMax()).equals(pkRange.getMaxExclusive())).flatMap(pkRange -> {
            FeedRangeEpkImpl feedRangeEpk = new FeedRangeEpkImpl(pkRange.toRange());
            return this.leaseManager.createLeaseIfNotExist(feedRangeEpk, null);
        }, this.degreeOfParallelism).map(newLease -> {
            this.logger.info("Partition {} split into new partition and continuation token {}.", (Object)newLease.getLeaseToken(), (Object)lastContinuationToken);
            return newLease;
        });
    }

    private Flux<PartitionKeyRange> enumPartitionKeyRanges() {
        String partitionKeyRangesPath = BridgeInternal.extractContainerSelfLink(this.collectionSelfLink);
        CosmosQueryRequestOptions cosmosQueryRequestOptions = new CosmosQueryRequestOptions();
        ModelBridgeInternal.setQueryRequestOptionsContinuationTokenAndMaxItemCount(cosmosQueryRequestOptions, null, this.maxBatchSize);
        return this.documentClient.readPartitionKeyRangeFeed(partitionKeyRangesPath, cosmosQueryRequestOptions).map(FeedResponse::getResults).flatMap(Flux::fromIterable).onErrorResume(throwable -> {
            this.logger.warn("Exception occurred while reading partition key range feed", throwable);
            return Flux.empty();
        });
    }

    private Flux<Lease> createLeases(List<PartitionKeyRange> partitionKeyRanges) {
        return this.leaseContainer.getAllLeases().collectList().flatMapMany(leaseList -> Flux.fromIterable((Iterable)partitionKeyRanges).flatMap(pkRange -> {
            boolean anyMatch = leaseList.stream().anyMatch(lease -> {
                Range<String> epkRange = ((FeedRangeEpkImpl)lease.getFeedRange()).getRange();
                return epkRange.getMin().equals(pkRange.getMinInclusive()) || epkRange.getMax().equals(pkRange.getMaxExclusive());
            });
            if (!anyMatch) {
                return Mono.just((Object)pkRange);
            }
            return Mono.empty();
        }).flatMap(pkRange -> {
            FeedRangeEpkImpl feedRangeEpk = new FeedRangeEpkImpl(pkRange.toRange());
            return this.leaseManager.createLeaseIfNotExist(feedRangeEpk, null);
        }, this.degreeOfParallelism));
    }
}

