package com.azure.cosmos.implementation.changefeed.common;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosAsyncDatabase;
import com.azure.cosmos.CosmosBridgeInternal;
import com.azure.cosmos.implementation.AsyncDocumentClient;
import com.azure.cosmos.implementation.Document;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.PartitionKeyRange;
import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient;
import com.azure.cosmos.implementation.guava25.base.Preconditions;
import com.azure.cosmos.implementation.routing.Range;
import com.azure.cosmos.models.CosmosBulkOperationResponse;
import com.azure.cosmos.models.CosmosBulkOperations;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosContainerRequestOptions;
import com.azure.cosmos.models.CosmosContainerResponse;
import com.azure.cosmos.models.CosmosDatabaseRequestOptions;
import com.azure.cosmos.models.CosmosDatabaseResponse;
import com.azure.cosmos.models.CosmosItemIdentity;
import com.azure.cosmos.models.CosmosItemRequestOptions;
import com.azure.cosmos.models.CosmosItemResponse;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.SqlQuerySpec;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:com/azure/cosmos/implementation/changefeed/common/ChangeFeedContextClientImpl.class */
public class ChangeFeedContextClientImpl implements ChangeFeedContextClient {
    private static final Logger logger = LoggerFactory.getLogger(ChangeFeedContextClientImpl.class);
    private final AsyncDocumentClient documentClient;
    private final CosmosAsyncContainer cosmosContainer;
    private Scheduler scheduler;

    public ChangeFeedContextClientImpl(CosmosAsyncContainer cosmosAsyncContainer) {
        this(cosmosAsyncContainer, Schedulers.boundedElastic());
    }

    public ChangeFeedContextClientImpl(CosmosAsyncContainer cosmosAsyncContainer, Scheduler scheduler) {
        Preconditions.checkNotNull(cosmosAsyncContainer, "Argument 'cosmosContainer' can not be null");
        this.cosmosContainer = cosmosAsyncContainer;
        this.documentClient = CosmosBridgeInternal.getContextClient(cosmosAsyncContainer);
        this.scheduler = scheduler;
    }

    @Override // com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient
    public Scheduler getScheduler() {
        return this.scheduler;
    }

    @Override // com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient
    public void setScheduler(Scheduler scheduler) {
        this.scheduler = scheduler;
    }

    @Override // com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient
    public Mono<List<PartitionKeyRange>> getOverlappingRanges(Range<String> range, boolean z) {
        AsyncDocumentClient asyncDocumentClient = CosmosBridgeInternal.getAsyncDocumentClient(this.cosmosContainer.getDatabase());
        return asyncDocumentClient.getCollectionCache().resolveByNameAsync(null, BridgeInternal.extractContainerSelfLink(this.cosmosContainer), null).flatMap(documentCollection -> {
            return asyncDocumentClient.getPartitionKeyRangeCache().tryGetOverlappingRangesAsync(null, documentCollection.getResourceId(), range, z, null);
        }).flatMap(valueHolder -> {
            if (valueHolder != null && valueHolder.v != 0) {
                return Mono.just((List) valueHolder.v);
            }
            logger.warn("There are no overlapping ranges found for range {}", range);
            return Mono.just(new ArrayList());
        }).publishOn(this.scheduler);
    }

    @Override // com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient
    public Flux<FeedResponse<PartitionKeyRange>> readPartitionKeyRangeFeed(String str, CosmosQueryRequestOptions cosmosQueryRequestOptions) {
        return this.documentClient.readPartitionKeyRanges(str, cosmosQueryRequestOptions).publishOn(this.scheduler);
    }

    @Override // com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient
    public <T> Flux<FeedResponse<T>> createDocumentChangeFeedQuery(CosmosAsyncContainer cosmosAsyncContainer, CosmosChangeFeedRequestOptions cosmosChangeFeedRequestOptions, Class<T> cls) {
        return createDocumentChangeFeedQuery(cosmosAsyncContainer, cosmosChangeFeedRequestOptions, cls, true);
    }

    @Override // com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient
    public <T> Flux<FeedResponse<T>> createDocumentChangeFeedQuery(CosmosAsyncContainer cosmosAsyncContainer, CosmosChangeFeedRequestOptions cosmosChangeFeedRequestOptions, Class<T> cls, boolean z) {
        if (z) {
            ModelBridgeInternal.disableSplitHandling(cosmosChangeFeedRequestOptions);
        }
        AsyncDocumentClient asyncDocumentClient = CosmosBridgeInternal.getAsyncDocumentClient(cosmosAsyncContainer.getDatabase());
        return asyncDocumentClient.getCollectionCache().resolveByNameAsync(null, BridgeInternal.extractContainerSelfLink(cosmosAsyncContainer), null).flatMapMany(documentCollection -> {
            if (documentCollection == null) {
                throw new IllegalStateException("Collection cannot be null");
            }
            return asyncDocumentClient.queryDocumentChangeFeed(documentCollection, cosmosChangeFeedRequestOptions, Document.class).map(feedResponse -> {
                return BridgeInternal.toFeedResponsePage((List) feedResponse.getResults().stream().map(document -> {
                    return document.toObject(cls);
                }).collect(Collectors.toList()), feedResponse.getResponseHeaders(), ImplementationBridgeHelpers.FeedResponseHelper.getFeedResponseAccessor().getNoChanges(feedResponse), feedResponse.getCosmosDiagnostics());
            });
        }).publishOn(this.scheduler);
    }

    @Override // com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient
    public Mono<CosmosDatabaseResponse> readDatabase(CosmosAsyncDatabase cosmosAsyncDatabase, CosmosDatabaseRequestOptions cosmosDatabaseRequestOptions) {
        return cosmosAsyncDatabase.read().publishOn(this.scheduler);
    }

    @Override // com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient
    public Mono<CosmosContainerResponse> readContainer(CosmosAsyncContainer cosmosAsyncContainer, CosmosContainerRequestOptions cosmosContainerRequestOptions) {
        return cosmosAsyncContainer.read(cosmosContainerRequestOptions).publishOn(this.scheduler);
    }

    @Override // com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient
    public <T> Mono<CosmosItemResponse<T>> createItem(CosmosAsyncContainer cosmosAsyncContainer, T t, CosmosItemRequestOptions cosmosItemRequestOptions, boolean z) {
        return cosmosItemRequestOptions != null ? cosmosAsyncContainer.createItem(t, cosmosItemRequestOptions).publishOn(this.scheduler) : cosmosAsyncContainer.createItem(t).publishOn(this.scheduler);
    }

    @Override // com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient
    public Mono<CosmosItemResponse<Object>> deleteItem(String str, PartitionKey partitionKey, CosmosItemRequestOptions cosmosItemRequestOptions) {
        return this.cosmosContainer.deleteItem(str, partitionKey, cosmosItemRequestOptions).publishOn(this.scheduler);
    }

    @Override // com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient
    public Flux<CosmosBulkOperationResponse<Object>> deleteAllItems(List<CosmosItemIdentity> list) {
        ArrayList arrayList = new ArrayList();
        for (CosmosItemIdentity cosmosItemIdentity : list) {
            arrayList.add(CosmosBulkOperations.getDeleteItemOperation(cosmosItemIdentity.getId(), cosmosItemIdentity.getPartitionKey()));
        }
        return this.cosmosContainer.executeBulkOperations(Flux.fromIterable(arrayList)).publishOn(this.scheduler);
    }

    @Override // com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient
    public <T> Mono<CosmosItemResponse<T>> replaceItem(String str, PartitionKey partitionKey, T t, CosmosItemRequestOptions cosmosItemRequestOptions) {
        return this.cosmosContainer.replaceItem(t, str, partitionKey, cosmosItemRequestOptions).publishOn(this.scheduler);
    }

    @Override // com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient
    public <T> Mono<CosmosItemResponse<T>> readItem(String str, PartitionKey partitionKey, CosmosItemRequestOptions cosmosItemRequestOptions, Class<T> cls) {
        return this.cosmosContainer.readItem(str, partitionKey, cosmosItemRequestOptions, cls).publishOn(this.scheduler);
    }

    @Override // com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient
    public <T> Flux<FeedResponse<T>> queryItems(CosmosAsyncContainer cosmosAsyncContainer, SqlQuerySpec sqlQuerySpec, CosmosQueryRequestOptions cosmosQueryRequestOptions, Class<T> cls) {
        return cosmosAsyncContainer.queryItems(sqlQuerySpec, cosmosQueryRequestOptions, cls).byPage().publishOn(this.scheduler);
    }

    @Override // com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient
    public URI getServiceEndpoint() {
        return this.documentClient.getServiceEndpoint();
    }

    @Override // com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient
    public Mono<CosmosContainerProperties> readContainerSettings(CosmosAsyncContainer cosmosAsyncContainer, CosmosContainerRequestOptions cosmosContainerRequestOptions) {
        return cosmosAsyncContainer.read(cosmosContainerRequestOptions).map((v0) -> {
            return v0.getProperties();
        });
    }

    @Override // com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient
    public CosmosAsyncContainer getContainerClient() {
        return this.cosmosContainer;
    }

    @Override // com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient
    public CosmosAsyncDatabase getDatabaseClient() {
        return this.cosmosContainer.getDatabase();
    }

    @Override // com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient
    public void close() {
    }
}
