package com.azure.cosmos.implementation.changefeed.pkversion;

import com.azure.cosmos.implementation.CosmosSchedulers;
import com.azure.cosmos.implementation.changefeed.Bootstrapper;
import com.azure.cosmos.implementation.changefeed.LeaseStore;
import java.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/azure/cosmos/implementation/changefeed/pkversion/BootstrapperImpl.class */
class BootstrapperImpl implements Bootstrapper {
    private final Logger logger = LoggerFactory.getLogger(BootstrapperImpl.class);
    private final PartitionSynchronizer synchronizer;
    private final LeaseStore leaseStore;
    private final Duration lockTime;
    private final Duration sleepTime;
    private volatile boolean isInitialized;
    private volatile boolean isLockAcquired;

    public BootstrapperImpl(PartitionSynchronizer partitionSynchronizer, LeaseStore leaseStore, Duration duration, Duration duration2) {
        if (partitionSynchronizer == null) {
            throw new IllegalArgumentException("synchronizer cannot be null!");
        }
        if (leaseStore == null) {
            throw new IllegalArgumentException("leaseStore cannot be null!");
        }
        if (duration == null || duration.isNegative() || duration.isZero()) {
            throw new IllegalArgumentException("lockTime should be non-null and positive");
        }
        if (duration2 == null || duration2.isNegative() || duration2.isZero()) {
            throw new IllegalArgumentException("sleepTime should be non-null and positive");
        }
        this.synchronizer = partitionSynchronizer;
        this.leaseStore = leaseStore;
        this.lockTime = duration;
        this.sleepTime = duration2;
        this.isInitialized = false;
    }

    @Override // com.azure.cosmos.implementation.changefeed.Bootstrapper
    public Mono<Void> initialize() {
        this.isInitialized = false;
        return Mono.just(this).flatMap(bootstrapperImpl -> {
            return this.leaseStore.isInitialized();
        }).flatMap(bool -> {
            this.isInitialized = bool.booleanValue();
            if (bool.booleanValue()) {
                return Mono.empty();
            }
            this.logger.info("Acquire initialization lock");
            return this.leaseStore.acquireInitializationLock(this.lockTime).flatMap(bool -> {
                this.isLockAcquired = bool.booleanValue();
                if (this.isLockAcquired) {
                    return this.synchronizer.createMissingLeases().then(this.leaseStore.markInitialized());
                }
                this.logger.info("Another instance is initializing the store");
                return Mono.just(Boolean.valueOf(this.isLockAcquired)).delayElement(this.sleepTime, CosmosSchedulers.COSMOS_PARALLEL);
            }).onErrorResume(th -> {
                this.logger.warn("Unexpected exception caught while initializing the lock", th);
                return Mono.just(Boolean.valueOf(this.isLockAcquired));
            }).flatMap(bool2 -> {
                return this.isLockAcquired ? this.leaseStore.releaseInitializationLock() : Mono.just(bool2);
            });
        }).repeat(() -> {
            return !this.isInitialized;
        }).then();
    }
}
