/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.implementation.changefeed.incremental;

import com.azure.cosmos.implementation.changefeed.CancellationToken;
import com.azure.cosmos.implementation.changefeed.CancellationTokenSource;
import com.azure.cosmos.implementation.changefeed.Lease;
import com.azure.cosmos.implementation.changefeed.LeaseContainer;
import com.azure.cosmos.implementation.changefeed.LeaseManager;
import com.azure.cosmos.implementation.changefeed.PartitionController;
import com.azure.cosmos.implementation.changefeed.PartitionSupervisor;
import com.azure.cosmos.implementation.changefeed.PartitionSupervisorFactory;
import com.azure.cosmos.implementation.changefeed.PartitionSynchronizer;
import com.azure.cosmos.implementation.changefeed.exceptions.LeaseLostException;
import com.azure.cosmos.implementation.changefeed.exceptions.PartitionSplitException;
import com.azure.cosmos.implementation.changefeed.exceptions.TaskCancelledException;
import com.azure.cosmos.implementation.changefeed.incremental.WorkerTask;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

class PartitionControllerImpl
implements PartitionController {
    private static final Logger logger = LoggerFactory.getLogger(PartitionControllerImpl.class);
    private final Map<String, WorkerTask> currentlyOwnedPartitions = new ConcurrentHashMap<String, WorkerTask>();
    private final LeaseContainer leaseContainer;
    private final LeaseManager leaseManager;
    private final PartitionSupervisorFactory partitionSupervisorFactory;
    private final PartitionSynchronizer synchronizer;
    private CancellationTokenSource shutdownCts;
    private final Scheduler scheduler;

    public PartitionControllerImpl(LeaseContainer leaseContainer, LeaseManager leaseManager, PartitionSupervisorFactory partitionSupervisorFactory, PartitionSynchronizer synchronizer, Scheduler scheduler) {
        this.leaseContainer = leaseContainer;
        this.leaseManager = leaseManager;
        this.partitionSupervisorFactory = partitionSupervisorFactory;
        this.synchronizer = synchronizer;
        this.scheduler = scheduler;
    }

    @Override
    public Mono<Void> initialize() {
        this.shutdownCts = new CancellationTokenSource();
        return this.loadLeases();
    }

    @Override
    public synchronized Mono<Lease> addOrUpdateLease(Lease lease) {
        WorkerTask workerTask = this.currentlyOwnedPartitions.get(lease.getLeaseToken());
        if (workerTask != null && workerTask.isRunning()) {
            return this.leaseManager.updateProperties(lease).map(updatedLease -> {
                logger.debug("Partition {}: updated.", (Object)updatedLease.getLeaseToken());
                return updatedLease;
            });
        }
        return this.leaseManager.acquire(lease).map(updatedLease -> {
            WorkerTask checkTask = this.currentlyOwnedPartitions.get(lease.getLeaseToken());
            if (checkTask == null) {
                logger.info("Partition {}: acquired.", (Object)updatedLease.getLeaseToken());
                PartitionSupervisor supervisor = this.partitionSupervisorFactory.create((Lease)updatedLease);
                this.currentlyOwnedPartitions.put(updatedLease.getLeaseToken(), this.processPartition(supervisor, (Lease)updatedLease));
            }
            return updatedLease;
        }).onErrorResume(throwable -> {
            logger.warn("Partition {}: unexpected error; removing lease from current cache.", (Object)lease.getLeaseToken());
            return this.removeLease(lease).then(Mono.error((Throwable)throwable));
        });
    }

    @Override
    public Mono<Void> shutdown() {
        this.shutdownCts.cancel();
        return Mono.empty();
    }

    private Mono<Void> loadLeases() {
        logger.debug("Starting renew leases assigned to this host on initialize.");
        return this.leaseContainer.getOwnedLeases().flatMap(lease -> {
            logger.info("Acquired lease for PartitionId '{}' on startup.", (Object)lease.getLeaseToken());
            return this.addOrUpdateLease((Lease)lease);
        }).then();
    }

    private Mono<Void> removeLease(Lease lease) {
        return Mono.just((Object)this).flatMap(value -> {
            WorkerTask workerTask = this.currentlyOwnedPartitions.remove(lease.getLeaseToken());
            if (workerTask != null && workerTask.isRunning()) {
                workerTask.cancelJob();
            }
            logger.info("Partition {}: released.", (Object)lease.getLeaseToken());
            return this.leaseManager.release(lease);
        }).onErrorResume(e -> {
            if (e instanceof LeaseLostException) {
                logger.warn("Partition {}: lease already removed.", (Object)lease.getLeaseToken());
            } else {
                logger.warn("Partition {}: failed to remove lease.", (Object)lease.getLeaseToken(), e);
            }
            return Mono.empty();
        }).doOnSuccess(aVoid -> logger.info("Partition {}: successfully removed lease.", (Object)lease.getLeaseToken()));
    }

    private WorkerTask processPartition(PartitionSupervisor partitionSupervisor, Lease lease) {
        CancellationToken shutdownToken = this.shutdownCts.getToken();
        WorkerTask partitionSupervisorTask = new WorkerTask(lease, partitionSupervisor, this.getWorkerJob(partitionSupervisor, lease, shutdownToken));
        this.scheduler.schedule((Runnable)partitionSupervisorTask);
        return partitionSupervisorTask;
    }

    private Mono<Void> getWorkerJob(PartitionSupervisor partitionSupervisor, Lease lease, CancellationToken shutdownToken) {
        return partitionSupervisor.run(shutdownToken).onErrorResume(throwable -> {
            if (throwable instanceof PartitionSplitException) {
                PartitionSplitException ex = (PartitionSplitException)throwable;
                return this.handleSplit(lease, ex.getLastContinuation());
            }
            if (throwable instanceof TaskCancelledException) {
                logger.debug("Partition {}: processing canceled.", (Object)lease.getLeaseToken());
            } else {
                logger.warn("Partition {}: processing failed.", (Object)lease.getLeaseToken(), throwable);
            }
            return Mono.empty();
        }).then(this.removeLease(lease));
    }

    private Mono<Void> handleSplit(Lease lease, String lastContinuationToken) {
        lease.setContinuationToken(lastContinuationToken);
        return this.synchronizer.splitPartition(lease).flatMap(l -> {
            l.setProperties(lease.getProperties());
            return this.addOrUpdateLease((Lease)l);
        }).then(this.leaseManager.delete(lease)).onErrorResume(throwable -> {
            logger.warn("Partition {}: failed to split", (Object)lease.getLeaseToken(), throwable);
            return Mono.empty();
        });
    }
}

