/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.implementation.changefeed.pkversion;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.implementation.PartitionKeyRange;
import com.azure.cosmos.implementation.Resource;
import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient;
import com.azure.cosmos.implementation.changefeed.Lease;
import com.azure.cosmos.implementation.changefeed.LeaseContainer;
import com.azure.cosmos.implementation.changefeed.LeaseManager;
import com.azure.cosmos.implementation.changefeed.pkversion.PartitionSynchronizer;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

class PartitionSynchronizerImpl
implements PartitionSynchronizer {
    private final Logger logger = LoggerFactory.getLogger(PartitionSynchronizerImpl.class);
    private final ChangeFeedContextClient documentClient;
    private final CosmosAsyncContainer collectionSelfLink;
    private final LeaseContainer leaseContainer;
    private final LeaseManager leaseManager;
    private final int degreeOfParallelism;
    private final int maxBatchSize;
    private final String collectionResourceId;

    public PartitionSynchronizerImpl(ChangeFeedContextClient documentClient, CosmosAsyncContainer collectionSelfLink, LeaseContainer leaseContainer, LeaseManager leaseManager, int degreeOfParallelism, int maxBatchSize, String collectionResourceId) {
        this.documentClient = documentClient;
        this.collectionSelfLink = collectionSelfLink;
        this.leaseContainer = leaseContainer;
        this.leaseManager = leaseManager;
        this.degreeOfParallelism = degreeOfParallelism;
        this.maxBatchSize = maxBatchSize;
        this.collectionResourceId = collectionResourceId;
    }

    @Override
    public Mono<Void> createMissingLeases() {
        ConcurrentHashMap leaseTokenMap = new ConcurrentHashMap();
        return this.enumPartitionKeyRanges().map(partitionKeyRange -> {
            leaseTokenMap.put(partitionKeyRange.getId(), partitionKeyRange.getParents());
            return partitionKeyRange.getId();
        }).collectList().flatMap(partitionKeyRangeIds -> {
            this.logger.info("Checking whether leases for any partition is missing - partitions - {}", (Object)String.join((CharSequence)", ", partitionKeyRangeIds));
            return this.createLeases(leaseTokenMap).then();
        }).onErrorResume(throwable -> {
            this.logger.error("Failed to create missing leases.", throwable);
            return Mono.error((Throwable)throwable);
        });
    }

    @Override
    public Flux<Lease> splitPartition(Lease lease) {
        if (lease == null) {
            throw new IllegalArgumentException("lease");
        }
        String leaseToken = lease.getLeaseToken();
        String lastContinuationToken = lease.getContinuationToken();
        this.logger.info("Partition {} is gone due to split; will attempt to resume using continuation token {}.", (Object)leaseToken, (Object)lastContinuationToken);
        return this.enumPartitionKeyRanges().filter(range -> range != null && range.getParents() != null && range.getParents().contains(leaseToken)).map(Resource::getId).collectList().flatMapMany(addedLeaseTokens -> {
            if (addedLeaseTokens.size() == 0) {
                this.logger.error("Partition {} had split but we failed to find at least one child partition", (Object)leaseToken);
                throw new RuntimeException(String.format("Partition %s had split but we failed to find at least one child partition", leaseToken));
            }
            return Flux.fromIterable((Iterable)addedLeaseTokens);
        }).flatMap(addedRangeId -> this.leaseManager.createLeaseIfNotExist((String)addedRangeId, lastContinuationToken, lease.getProperties()), this.degreeOfParallelism).map(newLease -> {
            this.logger.info("Partition {} split into new partition {} and continuation token {}.", new Object[]{leaseToken, newLease.getLeaseToken(), lastContinuationToken});
            return newLease;
        });
    }

    private Flux<PartitionKeyRange> enumPartitionKeyRanges() {
        String partitionKeyRangesPath = BridgeInternal.extractContainerSelfLink(this.collectionSelfLink);
        CosmosQueryRequestOptions cosmosQueryRequestOptions = new CosmosQueryRequestOptions();
        ModelBridgeInternal.setQueryRequestOptionsContinuationTokenAndMaxItemCount(cosmosQueryRequestOptions, null, this.maxBatchSize);
        return this.documentClient.readPartitionKeyRangeFeed(partitionKeyRangesPath, cosmosQueryRequestOptions).map(FeedResponse::getResults).flatMap(Flux::fromIterable).onErrorResume(throwable -> {
            this.logger.error("Failed to retrieve physical partition information.", throwable);
            return Flux.empty();
        });
    }

    private Flux<Lease> createLeases(Map<String, List<String>> leaseTokenMap) {
        ArrayList leaseTokensToBeAdded = new ArrayList();
        return this.leaseContainer.getAllLeases().map(lease -> lease.getLeaseToken()).collectList().flatMapMany(existingLeaseTokens -> {
            leaseTokensToBeAdded.addAll(leaseTokenMap.entrySet().stream().filter(entry -> !existingLeaseTokens.contains(entry.getKey())).filter(entry -> {
                if (entry.getValue() == null) return true;
                if (((List)entry.getValue()).isEmpty()) return true;
                if (!((List)entry.getValue()).stream().noneMatch(existingLeaseTokens::contains)) return false;
                return true;
            }).map(Map.Entry::getKey).collect(Collectors.toList()));
            this.logger.info("Missing lease documents for partitions: [{}]", (Object)String.join((CharSequence)", ", leaseTokensToBeAdded));
            return Flux.fromIterable((Iterable)leaseTokensToBeAdded);
        }).flatMap(leaseTokenToBeAdded -> {
            this.logger.debug("Adding a new lease document for partition {}", leaseTokenToBeAdded);
            return this.leaseManager.createLeaseIfNotExist((String)leaseTokenToBeAdded, null);
        }, this.degreeOfParallelism).map(lease -> {
            this.logger.info("Added new lease document for partition {}", (Object)lease.getLeaseToken());
            return lease;
        });
    }
}

