package com.azure.cosmos.implementation.changefeed.pkversion;

import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.CosmosSchedulers;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.changefeed.CancellationToken;
import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient;
import com.azure.cosmos.implementation.changefeed.ChangeFeedObserver;
import com.azure.cosmos.implementation.changefeed.Lease;
import com.azure.cosmos.implementation.changefeed.PartitionCheckpointer;
import com.azure.cosmos.implementation.changefeed.ProcessorSettings;
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedObserverContextImpl;
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState;
import com.azure.cosmos.implementation.changefeed.common.ExceptionClassifier;
import com.azure.cosmos.implementation.changefeed.common.StatusCodeErrorType;
import com.azure.cosmos.implementation.changefeed.exceptions.FeedRangeGoneException;
import com.azure.cosmos.implementation.changefeed.exceptions.LeaseLostException;
import com.azure.cosmos.implementation.changefeed.exceptions.PartitionNotFoundException;
import com.azure.cosmos.implementation.changefeed.exceptions.TaskCancelledException;
import com.azure.cosmos.implementation.feedranges.FeedRangeInternal;
import com.azure.cosmos.implementation.feedranges.FeedRangePartitionKeyRangeImpl;
import com.azure.cosmos.implementation.guava25.base.Preconditions;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.fasterxml.jackson.databind.JsonNode;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalAmount;
import java.time.temporal.TemporalUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/azure/cosmos/implementation/changefeed/pkversion/PartitionProcessorImpl.class */
class PartitionProcessorImpl implements PartitionProcessor {
    private static final Logger logger = LoggerFactory.getLogger(PartitionProcessorImpl.class);
    private final ProcessorSettings settings;
    private final PartitionCheckpointer checkpointer;
    private final ChangeFeedObserver<JsonNode> observer;
    private volatile CosmosChangeFeedRequestOptions options;
    private final ChangeFeedContextClient documentClient;
    private final Lease lease;
    private volatile RuntimeException resultException;
    private volatile String lastServerContinuationToken;
    private volatile boolean hasMoreResults;

    public PartitionProcessorImpl(ChangeFeedObserver<JsonNode> changeFeedObserver, ChangeFeedContextClient changeFeedContextClient, ProcessorSettings processorSettings, PartitionCheckpointer partitionCheckpointer, Lease lease) {
        this.observer = changeFeedObserver;
        this.documentClient = changeFeedContextClient;
        this.settings = processorSettings;
        this.checkpointer = partitionCheckpointer;
        this.lease = lease;
        this.options = ModelBridgeInternal.createChangeFeedRequestOptionsForChangeFeedState(processorSettings.getStartState());
        this.options.setMaxItemCount(processorSettings.getMaxItemCount());
        ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper.getCosmosChangeFeedRequestOptionsAccessor().setHeader(this.options, HttpConstants.HttpHeaders.SDK_SUPPORTED_CAPABILITIES, String.valueOf(HttpConstants.SDKSupportedCapabilities.SUPPORTED_CAPABILITIES_NONE));
    }

    @Override // com.azure.cosmos.implementation.changefeed.pkversion.PartitionProcessor
    public Mono<Void> run(CancellationToken cancellationToken) {
        logger.info("Partition {}: processing task started with owner {}.", this.lease.getLeaseToken(), this.lease.getOwner());
        this.hasMoreResults = true;
        this.checkpointer.setCancellationToken(cancellationToken);
        return Flux.just(this).flatMap(partitionProcessorImpl -> {
            if (cancellationToken.isCancellationRequested()) {
                return Flux.empty();
            }
            if (this.hasMoreResults && this.resultException == null) {
                return Flux.just(partitionProcessorImpl);
            }
            Instant plus = Instant.now().plus((TemporalAmount) this.settings.getFeedPollDelay());
            return Mono.just(partitionProcessorImpl).delayElement(Duration.ofMillis(100L), CosmosSchedulers.COSMOS_PARALLEL).repeat(() -> {
                return !cancellationToken.isCancellationRequested() && Instant.now().isBefore(plus);
            }).last();
        }).flatMap(partitionProcessorImpl2 -> {
            return this.documentClient.createDocumentChangeFeedQuery(this.settings.getCollectionSelfLink(), this.options, JsonNode.class).limitRequest(1L);
        }).flatMap(feedResponse -> {
            if (cancellationToken.isCancellationRequested()) {
                return Flux.error(new TaskCancelledException());
            }
            String m616getContinuationToken = feedResponse.m616getContinuationToken();
            ChangeFeedState fromString = ChangeFeedState.fromString(m616getContinuationToken);
            Preconditions.checkNotNull(fromString, "Argument 'continuationState' must not be null.");
            Preconditions.checkArgument(fromString.getContinuation().getContinuationTokenCount() == 1, "For ChangeFeedProcessor the continuation state should always have one range/continuation");
            this.lastServerContinuationToken = fromString.getContinuation().getCurrentContinuationToken().getToken();
            this.hasMoreResults = !ModelBridgeInternal.noChanges(feedResponse);
            if (feedResponse.getResults() == null || feedResponse.getResults().size() <= 0) {
                this.options = CosmosChangeFeedRequestOptions.createForProcessingFromContinuation(m616getContinuationToken);
                return cancellationToken.isCancellationRequested() ? Flux.error(new TaskCancelledException()) : Flux.empty();
            }
            logger.info("Partition {}: processing {} feeds with owner {}.", new Object[]{this.lease.getLeaseToken(), Integer.valueOf(feedResponse.getResults().size()), this.lease.getOwner()});
            return dispatchChanges(feedResponse, fromString).doOnError(th -> {
                logger.debug("Exception was thrown from thread {}", Long.valueOf(Thread.currentThread().getId()), th);
            }).doOnSuccess(r6 -> {
                this.options = CosmosChangeFeedRequestOptions.createForProcessingFromContinuation(m616getContinuationToken);
                if (cancellationToken.isCancellationRequested()) {
                    throw new TaskCancelledException();
                }
            });
        }).doOnComplete(() -> {
            if (this.options.getMaxItemCount() != this.settings.getMaxItemCount()) {
                this.options.setMaxItemCount(this.settings.getMaxItemCount());
            }
        }).onErrorResume(th -> {
            if (th instanceof CosmosException) {
                ?? r0 = (CosmosException) th;
                logger.warn("CosmosException: Partition {} from thread {} with owner {}", new Object[]{this.lease.getLeaseToken(), Long.valueOf(Thread.currentThread().getId()), this.lease.getOwner(), r0});
                StatusCodeErrorType classifyClientException = ExceptionClassifier.classifyClientException(r0);
                switch (classifyClientException) {
                    case PARTITION_NOT_FOUND:
                        this.resultException = new PartitionNotFoundException("Partition not found.", this.lastServerContinuationToken);
                        break;
                    case PARTITION_SPLIT_OR_MERGE:
                        this.resultException = new FeedRangeGoneException("Partition split.", this.lastServerContinuationToken);
                        break;
                    case UNDEFINED:
                        this.resultException = new RuntimeException((Throwable) r0);
                        break;
                    case MAX_ITEM_COUNT_TOO_LARGE:
                        if (this.options.getMaxItemCount() <= 1) {
                            logger.error("Cannot reduce maxItemCount further as it's already at {}", Integer.valueOf(this.options.getMaxItemCount()), (Object) r0);
                            this.resultException = new RuntimeException((Throwable) r0);
                        }
                        this.options.setMaxItemCount(this.options.getMaxItemCount() / 2);
                        logger.warn("Reducing maxItemCount, new value: {}", Integer.valueOf(this.options.getMaxItemCount()));
                        return Flux.empty();
                    case TRANSIENT_ERROR:
                        if (r0.getRetryAfterDuration().toMillis() > 0) {
                            Instant plus = Instant.now().plus(r0.getRetryAfterDuration().toMillis(), (TemporalUnit) ChronoUnit.MILLIS);
                            return Mono.just(Long.valueOf(r0.getRetryAfterDuration().toMillis())).delayElement(Duration.ofMillis(100L), CosmosSchedulers.COSMOS_PARALLEL).repeat(() -> {
                                return !cancellationToken.isCancellationRequested() && Instant.now().isBefore(plus);
                            }).flatMap(l -> {
                                return Flux.empty();
                            });
                        }
                        break;
                    default:
                        logger.error("Unrecognized Cosmos exception returned error code {}", classifyClientException, (Object) r0);
                        this.resultException = new RuntimeException((Throwable) r0);
                        break;
                }
            } else if (th instanceof LeaseLostException) {
                logger.info("LeaseLoseException with Partition {} from thread {} with owner {}", new Object[]{this.lease.getLeaseToken(), Long.valueOf(Thread.currentThread().getId()), this.lease.getOwner()});
                this.resultException = (LeaseLostException) th;
            } else if (th instanceof TaskCancelledException) {
                logger.debug("Task cancelled exception: Partition {} from thread {} with owner {}", new Object[]{this.lease.getLeaseToken(), Long.valueOf(Thread.currentThread().getId()), this.lease.getOwner(), th});
                this.resultException = (TaskCancelledException) th;
            } else {
                logger.warn("Unexpected exception: Partition {} from thread {} with owner {}", new Object[]{this.lease.getLeaseToken(), Long.valueOf(Thread.currentThread().getId()), this.lease.getOwner(), th});
                this.resultException = new RuntimeException(th);
            }
            return Flux.error(th);
        }).repeat(() -> {
            if (!cancellationToken.isCancellationRequested()) {
                return true;
            }
            this.resultException = new TaskCancelledException();
            return false;
        }).onErrorResume(th2 -> {
            if (this.resultException == null) {
                this.resultException = new RuntimeException(th2);
            }
            return Flux.empty();
        }).then().doFinally(signalType -> {
            logger.info("Partition {}: processing task exited with owner {}.", this.lease.getLeaseToken(), this.lease.getOwner());
        });
    }

    private FeedRangePartitionKeyRangeImpl getPkRangeFeedRangeFromStartState() {
        FeedRangeInternal feedRange = this.settings.getStartState().getFeedRange();
        Preconditions.checkNotNull(feedRange, "FeedRange must not be null here.");
        Preconditions.checkArgument(feedRange instanceof FeedRangePartitionKeyRangeImpl, "FeedRange must be a PkRangeId FeedRange when using Lease V1 contract.");
        return (FeedRangePartitionKeyRangeImpl) feedRange;
    }

    @Override // com.azure.cosmos.implementation.changefeed.pkversion.PartitionProcessor
    public RuntimeException getResultException() {
        return this.resultException;
    }

    private Mono<Void> dispatchChanges(FeedResponse<JsonNode> feedResponse, ChangeFeedState changeFeedState) {
        return this.observer.processChanges(new ChangeFeedObserverContextImpl(getPkRangeFeedRangeFromStartState().getPartitionKeyRangeId(), feedResponse, changeFeedState, this.checkpointer), feedResponse.getResults());
    }
}
