package com.azure.cosmos.implementation.changefeed.epkversion;

import com.azure.cosmos.implementation.changefeed.CancellationToken;
import com.azure.cosmos.implementation.changefeed.CancellationTokenSource;
import com.azure.cosmos.implementation.changefeed.Lease;
import com.azure.cosmos.implementation.changefeed.LeaseContainer;
import com.azure.cosmos.implementation.changefeed.LeaseManager;
import com.azure.cosmos.implementation.changefeed.PartitionController;
import com.azure.cosmos.implementation.changefeed.PartitionSupervisor;
import com.azure.cosmos.implementation.changefeed.PartitionSupervisorFactory;
import com.azure.cosmos.implementation.changefeed.exceptions.FeedRangeGoneException;
import com.azure.cosmos.implementation.changefeed.exceptions.LeaseLostException;
import com.azure.cosmos.implementation.changefeed.exceptions.TaskCancelledException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

/* loaded from: input_file:com/azure/cosmos/implementation/changefeed/epkversion/PartitionControllerImpl.class */
class PartitionControllerImpl implements PartitionController {
    private static final Logger logger = LoggerFactory.getLogger(PartitionControllerImpl.class);
    private final Map<String, WorkerTask> currentlyOwnedPartitions = new ConcurrentHashMap();
    private final LeaseContainer leaseContainer;
    private final LeaseManager leaseManager;
    private final PartitionSupervisorFactory partitionSupervisorFactory;
    private final PartitionSynchronizer synchronizer;
    private CancellationTokenSource shutdownCts;
    private final Scheduler scheduler;

    public PartitionControllerImpl(LeaseContainer leaseContainer, LeaseManager leaseManager, PartitionSupervisorFactory partitionSupervisorFactory, PartitionSynchronizer partitionSynchronizer, Scheduler scheduler) {
        this.leaseContainer = leaseContainer;
        this.leaseManager = leaseManager;
        this.partitionSupervisorFactory = partitionSupervisorFactory;
        this.synchronizer = partitionSynchronizer;
        this.scheduler = scheduler;
    }

    @Override // com.azure.cosmos.implementation.changefeed.PartitionController
    public Mono<Void> initialize() {
        this.shutdownCts = new CancellationTokenSource();
        return loadLeases();
    }

    @Override // com.azure.cosmos.implementation.changefeed.PartitionController
    public synchronized Mono<Lease> addOrUpdateLease(Lease lease) {
        WorkerTask workerTask = this.currentlyOwnedPartitions.get(lease.getLeaseToken());
        return (workerTask == null || !workerTask.isRunning()) ? this.leaseManager.acquire(lease).map(lease2 -> {
            if (this.currentlyOwnedPartitions.get(lease.getLeaseToken()) == null) {
                logger.info("Lease with token {}: acquired.", lease2.getLeaseToken());
                this.currentlyOwnedPartitions.put(lease2.getLeaseToken(), processPartition(this.partitionSupervisorFactory.create(lease2), lease2));
            }
            return lease2;
        }).onErrorResume(th -> {
            logger.warn("Lease with token {}: unexpected error; removing lease from current cache.", lease.getLeaseToken(), th);
            return removeLease(lease).then(Mono.error(th));
        }) : this.leaseManager.updateProperties(lease).map(lease3 -> {
            logger.debug("Lease with token {}: updated.", lease3.getLeaseToken());
            return lease3;
        });
    }

    @Override // com.azure.cosmos.implementation.changefeed.PartitionController
    public Mono<Void> shutdown() {
        this.shutdownCts.cancel();
        return Mono.empty();
    }

    private Mono<Void> loadLeases() {
        logger.debug("Starting renew leases assigned to this host on initialize.");
        return this.leaseContainer.getOwnedLeases().flatMap(lease -> {
            logger.info("Lease with token {}: Acquired on startup.", lease.getLeaseToken());
            return addOrUpdateLease(lease);
        }).then();
    }

    private Mono<Void> removeLease(Lease lease) {
        return Mono.just(this).flatMap(partitionControllerImpl -> {
            WorkerTask remove = this.currentlyOwnedPartitions.remove(lease.getLeaseToken());
            if (remove != null && remove.isRunning()) {
                remove.cancelJob();
            }
            logger.info("Lease with token {}: released.", lease.getLeaseToken());
            return this.leaseManager.release(lease);
        }).onErrorResume(th -> {
            if (th instanceof LeaseLostException) {
                logger.warn("Lease with token {}: lease already removed.", lease.getLeaseToken());
            } else {
                logger.warn("Lease with token {}: failed to remove lease.", lease.getLeaseToken(), th);
            }
            return Mono.empty();
        }).doOnSuccess(r5 -> {
            logger.info("Lease with token {}: successfully removed lease.", lease.getLeaseToken());
        });
    }

    private WorkerTask processPartition(PartitionSupervisor partitionSupervisor, Lease lease) {
        WorkerTask workerTask = new WorkerTask(lease, partitionSupervisor, getWorkerJob(partitionSupervisor, lease, this.shutdownCts.getToken()));
        this.scheduler.schedule(workerTask);
        return workerTask;
    }

    private Mono<Void> getWorkerJob(PartitionSupervisor partitionSupervisor, Lease lease, CancellationToken cancellationToken) {
        return partitionSupervisor.run(cancellationToken).onErrorResume(th -> {
            if (th instanceof FeedRangeGoneException) {
                return handleFeedRangeGone(lease, ((FeedRangeGoneException) th).getLastContinuation());
            }
            if (th instanceof TaskCancelledException) {
                logger.debug("Lease with token {}: processing canceled.", lease.getLeaseToken());
            } else {
                logger.warn("Lease with token {}: processing failed.", lease.getLeaseToken(), th);
            }
            return Mono.empty();
        }).then(removeLease(lease));
    }

    private Mono<Void> handleFeedRangeGone(Lease lease, String str) {
        if (str != null) {
            logger.warn("Lease with token {}: with owner {}: updated with last continuation token {}", new Object[]{lease.getLeaseToken(), lease.getOwner(), str});
            lease.setContinuationToken(str);
        } else {
            logger.warn("Continuation token not found for split for lease with token {}: with owner {}", lease.getLeaseToken(), lease.getOwner());
        }
        return this.synchronizer.getFeedRangeGoneHandler(lease).flatMap(feedRangeGoneHandler -> {
            return feedRangeGoneHandler.handlePartitionGone().flatMap(lease2 -> {
                if (feedRangeGoneHandler.shouldSkipDirectLeaseAssignment()) {
                    return Mono.empty();
                }
                lease2.setProperties(lease.getProperties());
                return addOrUpdateLease(lease2);
            }).then(tryDeleteGoneLease(lease, feedRangeGoneHandler.shouldDeleteCurrentLease()));
        }).onErrorResume(th -> {
            logger.warn("Lease with token {}: failed to handle partition gone", lease.getLeaseToken(), th);
            return Mono.empty();
        });
    }

    private Mono<Void> tryDeleteGoneLease(Lease lease, boolean z) {
        return z ? this.leaseManager.delete(lease) : Mono.empty();
    }
}
