package com.azure.cosmos.implementation.batch;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.BulkProcessingOptions;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosBridgeInternal;
import com.azure.cosmos.CosmosBulkItemResponse;
import com.azure.cosmos.CosmosBulkOperationResponse;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.CosmosItemOperation;
import com.azure.cosmos.ThrottlingRetryOptions;
import com.azure.cosmos.TransactionalBatchOperationResult;
import com.azure.cosmos.TransactionalBatchResponse;
import com.azure.cosmos.implementation.AsyncDocumentClient;
import com.azure.cosmos.implementation.apachecommons.lang.tuple.Pair;
import com.azure.cosmos.implementation.guava25.base.Preconditions;
import io.netty.handler.codec.http.HttpResponseStatus;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.GroupedFlux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.UnicastProcessor;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:com/azure/cosmos/implementation/batch/BulkExecutor.class */
public final class BulkExecutor<TContext> {
    private static final Logger logger = LoggerFactory.getLogger(BulkExecutor.class);
    private final CosmosAsyncContainer container;
    private final AsyncDocumentClient docClientWrapper;
    private final ThrottlingRetryOptions throttlingRetryOptions;
    private final Flux<CosmosItemOperation> inputOperations;
    private final int maxMicroBatchSize;
    private final int maxMicroBatchConcurrency;
    private final Duration maxMicroBatchInterval;
    private final TContext batchContext;
    private final AtomicBoolean mainSourceCompleted;
    private final AtomicInteger totalCount;
    private final FluxProcessor<CosmosItemOperation, CosmosItemOperation> mainFluxProcessor;
    private final FluxSink<CosmosItemOperation> mainSink;
    private final List<FluxSink<CosmosItemOperation>> groupSinks;

    public BulkExecutor(CosmosAsyncContainer cosmosAsyncContainer, Flux<CosmosItemOperation> flux, BulkProcessingOptions<TContext> bulkProcessingOptions) {
        Preconditions.checkNotNull(cosmosAsyncContainer, "expected non-null container");
        Preconditions.checkNotNull(flux, "expected non-null inputOperations");
        Preconditions.checkNotNull(bulkProcessingOptions, "expected non-null bulkOptions");
        this.container = cosmosAsyncContainer;
        this.inputOperations = flux;
        this.docClientWrapper = CosmosBridgeInternal.getAsyncDocumentClient(cosmosAsyncContainer.getDatabase());
        this.throttlingRetryOptions = this.docClientWrapper.getConnectionPolicy().getThrottlingRetryOptions();
        this.maxMicroBatchSize = bulkProcessingOptions.getMaxMicroBatchSize();
        this.maxMicroBatchConcurrency = bulkProcessingOptions.getMaxMicroBatchConcurrency();
        this.maxMicroBatchInterval = bulkProcessingOptions.getMaxMicroBatchInterval();
        this.batchContext = bulkProcessingOptions.getBatchContext();
        this.mainSourceCompleted = new AtomicBoolean(false);
        this.totalCount = new AtomicInteger(0);
        this.mainFluxProcessor = UnicastProcessor.create().serialize();
        this.mainSink = this.mainFluxProcessor.sink(FluxSink.OverflowStrategy.BUFFER);
        this.groupSinks = new ArrayList();
    }

    public Flux<CosmosBulkOperationResponse<TContext>> execute() {
        return this.inputOperations.onErrorContinue((th, obj) -> {
            logger.error("Skipping an error operation while processing {}. Cause: {}", obj, th.getMessage());
        }).doOnNext(cosmosItemOperation -> {
            BulkExecutorUtil.setRetryPolicyForBulk(this.docClientWrapper, this.container, cosmosItemOperation, this.throttlingRetryOptions);
            this.totalCount.incrementAndGet();
        }).doOnComplete(() -> {
            this.mainSourceCompleted.set(true);
            if (this.totalCount.get() == 0) {
                completeAllSinks();
            }
        }).mergeWith(this.mainFluxProcessor).flatMap(cosmosItemOperation2 -> {
            return BulkExecutorUtil.resolvePartitionKeyRangeId(this.docClientWrapper, this.container, cosmosItemOperation2).map(str -> {
                return Pair.of(str, cosmosItemOperation2);
            });
        }).groupBy((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }).flatMap(this::executePartitionedGroup).doOnNext(cosmosBulkOperationResponse -> {
            if (this.totalCount.decrementAndGet() == 0 && this.mainSourceCompleted.get()) {
                completeAllSinks();
            }
        });
    }

    private Flux<CosmosBulkOperationResponse<TContext>> executePartitionedGroup(GroupedFlux<String, CosmosItemOperation> groupedFlux) {
        String str = (String) groupedFlux.key();
        FluxProcessor serialize = UnicastProcessor.create().serialize();
        FluxSink<CosmosItemOperation> sink = serialize.sink(FluxSink.OverflowStrategy.BUFFER);
        this.groupSinks.add(sink);
        return groupedFlux.mergeWith(serialize).bufferTimeout(this.maxMicroBatchSize, this.maxMicroBatchInterval).onBackpressureBuffer().flatMap(list -> {
            return executeOperations(list, str, sink);
        }, this.maxMicroBatchConcurrency);
    }

    private Flux<CosmosBulkOperationResponse<TContext>> executeOperations(List<CosmosItemOperation> list, String str, FluxSink<CosmosItemOperation> fluxSink) {
        ServerOperationBatchRequest createBatchRequest = BulkExecutorUtil.createBatchRequest(list, str);
        if (createBatchRequest.getBatchPendingOperations().size() > 0) {
            List<CosmosItemOperation> batchPendingOperations = createBatchRequest.getBatchPendingOperations();
            Objects.requireNonNull(fluxSink);
            batchPendingOperations.forEach((v1) -> {
                r1.next(v1);
            });
        }
        return Flux.just(createBatchRequest.getBatchRequest()).publishOn(Schedulers.boundedElastic()).flatMap(partitionKeyRangeServerBatchRequest -> {
            return executePartitionKeyRangeServerBatchRequest(partitionKeyRangeServerBatchRequest, fluxSink);
        });
    }

    private Flux<CosmosBulkOperationResponse<TContext>> executePartitionKeyRangeServerBatchRequest(PartitionKeyRangeServerBatchRequest partitionKeyRangeServerBatchRequest, FluxSink<CosmosItemOperation> fluxSink) {
        return executeBatchRequest(partitionKeyRangeServerBatchRequest).flatMapMany(transactionalBatchResponse -> {
            return Flux.fromIterable(transactionalBatchResponse.getResults()).flatMap(transactionalBatchOperationResult -> {
                return handleTransactionalBatchOperationResult(transactionalBatchResponse, transactionalBatchOperationResult, fluxSink);
            });
        }).onErrorResume(th -> {
            if (!(th instanceof Exception)) {
                throw Exceptions.propagate(th);
            }
            Exception exc = (Exception) th;
            return Flux.fromIterable(partitionKeyRangeServerBatchRequest.getOperations()).flatMap(cosmosItemOperation -> {
                return handleTransactionalBatchExecutionException(cosmosItemOperation, exc, fluxSink);
            });
        });
    }

    private Mono<CosmosBulkOperationResponse<TContext>> handleTransactionalBatchOperationResult(TransactionalBatchResponse transactionalBatchResponse, TransactionalBatchOperationResult transactionalBatchOperationResult, FluxSink<CosmosItemOperation> fluxSink) {
        CosmosBulkItemResponse createCosmosBulkItemResponse = BridgeInternal.createCosmosBulkItemResponse(transactionalBatchOperationResult, transactionalBatchResponse);
        CosmosItemOperation operation = transactionalBatchOperationResult.getOperation();
        if (transactionalBatchOperationResult.isSuccessStatusCode()) {
            return Mono.just(BridgeInternal.createCosmosBulkOperationResponse(operation, createCosmosBulkItemResponse, this.batchContext));
        }
        if (operation instanceof ItemBulkOperation) {
            return ((ItemBulkOperation) operation).getRetryPolicy().shouldRetry(transactionalBatchOperationResult).flatMap(shouldRetryResult -> {
                if (!shouldRetryResult.shouldRetry) {
                    return Mono.just(BridgeInternal.createCosmosBulkOperationResponse(operation, createCosmosBulkItemResponse, this.batchContext));
                }
                fluxSink.next(operation);
                return Mono.empty();
            });
        }
        throw new UnsupportedOperationException("Unknown CosmosItemOperation.");
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v8, types: [com.azure.cosmos.CosmosException, java.lang.Exception] */
    private Mono<CosmosBulkOperationResponse<TContext>> handleTransactionalBatchExecutionException(CosmosItemOperation cosmosItemOperation, Exception exc, FluxSink<CosmosItemOperation> fluxSink) {
        if (!(exc instanceof CosmosException) || !(cosmosItemOperation instanceof ItemBulkOperation)) {
            return Mono.just(BridgeInternal.createCosmosBulkOperationResponse(cosmosItemOperation, exc, this.batchContext));
        }
        ?? r0 = (CosmosException) exc;
        ItemBulkOperation itemBulkOperation = (ItemBulkOperation) cosmosItemOperation;
        if (r0.getStatusCode() != HttpResponseStatus.GONE.code() || !itemBulkOperation.getRetryPolicy().shouldRetryForGone(r0.getStatusCode(), r0.getSubStatusCode())) {
            return itemBulkOperation.getRetryPolicy().shouldRetry((Exception) r0).flatMap(shouldRetryResult -> {
                if (!shouldRetryResult.shouldRetry) {
                    return Mono.just(BridgeInternal.createCosmosBulkOperationResponse(cosmosItemOperation, exc, this.batchContext));
                }
                fluxSink.next(cosmosItemOperation);
                return Mono.empty();
            });
        }
        this.mainSink.next(cosmosItemOperation);
        return Mono.empty();
    }

    private Mono<TransactionalBatchResponse> executeBatchRequest(PartitionKeyRangeServerBatchRequest partitionKeyRangeServerBatchRequest) {
        return this.docClientWrapper.executeBatchRequest(BridgeInternal.getLink(this.container), partitionKeyRangeServerBatchRequest, null, false);
    }

    private void completeAllSinks() {
        logger.info("Closing all sinks");
        this.mainSink.complete();
        this.groupSinks.forEach((v0) -> {
            v0.complete();
        });
    }
}
