package com.azure.cosmos.implementation.changefeed.pkversion;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.Constants;
import com.azure.cosmos.implementation.InternalObjectNode;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.changefeed.CancellationToken;
import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient;
import com.azure.cosmos.implementation.changefeed.Lease;
import com.azure.cosmos.implementation.changefeed.LeaseStore;
import com.azure.cosmos.implementation.changefeed.LeaseStoreManager;
import com.azure.cosmos.implementation.changefeed.LeaseStoreManagerSettings;
import com.azure.cosmos.implementation.changefeed.RequestOptionsFactory;
import com.azure.cosmos.implementation.changefeed.ServiceItemLeaseUpdater;
import com.azure.cosmos.implementation.changefeed.exceptions.LeaseLostException;
import com.azure.cosmos.implementation.changefeed.exceptions.TaskCancelledException;
import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl;
import com.azure.cosmos.implementation.guava25.base.Preconditions;
import com.azure.cosmos.models.CosmosItemIdentity;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.SqlParameter;
import com.azure.cosmos.models.SqlQuerySpec;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/azure/cosmos/implementation/changefeed/pkversion/LeaseStoreManagerImpl.class */
public class LeaseStoreManagerImpl implements LeaseStoreManager, LeaseStoreManager.LeaseStoreManagerBuilderDefinition {
    private final String LEASE_STORE_MANAGER_LEASE_SUFFIX = "..";
    private final Logger logger = LoggerFactory.getLogger(LeaseStoreManagerImpl.class);
    private LeaseStoreManagerSettings settings = new LeaseStoreManagerSettings();
    private ChangeFeedContextClient leaseDocumentClient;
    private RequestOptionsFactory requestOptionsFactory;
    private ServiceItemLeaseUpdater leaseUpdater;
    private LeaseStore leaseStore;

    public static LeaseStoreManager.LeaseStoreManagerBuilderDefinition builder() {
        return new LeaseStoreManagerImpl();
    }

    @Override // com.azure.cosmos.implementation.changefeed.LeaseStoreManager.LeaseStoreManagerBuilderDefinition
    public LeaseStoreManager.LeaseStoreManagerBuilderDefinition leaseContextClient(ChangeFeedContextClient changeFeedContextClient) {
        Preconditions.checkNotNull(changeFeedContextClient, "Argument 'leaseContextClient' can not be null");
        this.leaseDocumentClient = changeFeedContextClient;
        return this;
    }

    @Override // com.azure.cosmos.implementation.changefeed.LeaseStoreManager.LeaseStoreManagerBuilderDefinition
    public LeaseStoreManager.LeaseStoreManagerBuilderDefinition leasePrefix(String str) {
        Preconditions.checkNotNull(str, "Argument 'leasePrefix' can not be null");
        this.settings.withContainerNamePrefix(str);
        return this;
    }

    @Override // com.azure.cosmos.implementation.changefeed.LeaseStoreManager.LeaseStoreManagerBuilderDefinition
    public LeaseStoreManager.LeaseStoreManagerBuilderDefinition leaseCollectionLink(CosmosAsyncContainer cosmosAsyncContainer) {
        Preconditions.checkNotNull(cosmosAsyncContainer, "Argument 'leaseCollectionLink' can not be null");
        this.settings.withLeaseCollectionLink(cosmosAsyncContainer);
        return this;
    }

    @Override // com.azure.cosmos.implementation.changefeed.LeaseStoreManager.LeaseStoreManagerBuilderDefinition
    public LeaseStoreManager.LeaseStoreManagerBuilderDefinition requestOptionsFactory(RequestOptionsFactory requestOptionsFactory) {
        Preconditions.checkNotNull(requestOptionsFactory, "Argument 'requestOptionsFactory' can not be null");
        this.requestOptionsFactory = requestOptionsFactory;
        return this;
    }

    @Override // com.azure.cosmos.implementation.changefeed.LeaseStoreManager.LeaseStoreManagerBuilderDefinition
    public LeaseStoreManager.LeaseStoreManagerBuilderDefinition hostName(String str) {
        Preconditions.checkNotNull(str, "Argument 'hostName' can not be null");
        this.settings.withHostName(str);
        return this;
    }

    @Override // com.azure.cosmos.implementation.changefeed.LeaseStoreManager.LeaseStoreManagerBuilderDefinition
    public LeaseStoreManager build() {
        Preconditions.checkNotNull(this.settings, "settings can not be null");
        Preconditions.checkNotNull(this.settings.getContainerNamePrefix(), "settings.containerNamePrefix can not be null");
        Preconditions.checkNotNull(this.settings.getLeaseCollectionLink(), "settings.leaseCollectionLink can not be null");
        Preconditions.checkArgument(StringUtils.isNotEmpty(this.settings.getHostName()), "settings.getHostName can not be null nor empty");
        Preconditions.checkNotNull(this.leaseDocumentClient, "leaseDocumentClient can not be null");
        Preconditions.checkNotNull(this.requestOptionsFactory, "requestOptionsFactory can not be null");
        if (this.leaseUpdater == null) {
            this.leaseUpdater = new DocumentServiceLeaseUpdaterImpl(this.leaseDocumentClient);
        }
        this.leaseStore = new LeaseStoreImpl(this.leaseDocumentClient, this.settings.getContainerNamePrefix(), this.settings.getLeaseCollectionLink(), this.requestOptionsFactory);
        return this;
    }

    @Override // com.azure.cosmos.implementation.changefeed.LeaseStoreManager, com.azure.cosmos.implementation.changefeed.LeaseContainer
    public Flux<Lease> getAllLeases() {
        return listDocuments(getPartitionLeasePrefix()).map(serviceItemLease -> {
            return serviceItemLease;
        });
    }

    @Override // com.azure.cosmos.implementation.changefeed.LeaseStoreManager
    public Flux<Lease> getTopLeases(int i) {
        return listDocuments(getPartitionLeasePrefix(), Integer.valueOf(i)).map(serviceItemLease -> {
            return serviceItemLease;
        });
    }

    @Override // com.azure.cosmos.implementation.changefeed.LeaseStoreManager, com.azure.cosmos.implementation.changefeed.LeaseContainer
    public Flux<Lease> getOwnedLeases() {
        return getAllLeases().filter(lease -> {
            return lease.getOwner() != null && lease.getOwner().equalsIgnoreCase(this.settings.getHostName());
        });
    }

    @Override // com.azure.cosmos.implementation.changefeed.LeaseStoreManager, com.azure.cosmos.implementation.changefeed.LeaseManager
    public Mono<Lease> createLeaseIfNotExist(FeedRangeEpkImpl feedRangeEpkImpl, String str) {
        return createLeaseIfNotExist(feedRangeEpkImpl, str, (Map<String, String>) null);
    }

    @Override // com.azure.cosmos.implementation.changefeed.LeaseStoreManager, com.azure.cosmos.implementation.changefeed.LeaseManager
    public Mono<Lease> createLeaseIfNotExist(FeedRangeEpkImpl feedRangeEpkImpl, String str, Map<String, String> map) {
        throw new UnsupportedOperationException("FeedRangeEpkImpl based leases are not supported for Change Feed V0 wire format");
    }

    @Override // com.azure.cosmos.implementation.changefeed.LeaseStoreManager, com.azure.cosmos.implementation.changefeed.LeaseManager
    public Mono<Lease> createLeaseIfNotExist(String str, String str2) {
        return createLeaseIfNotExist(str, str2, (Map<String, String>) null);
    }

    @Override // com.azure.cosmos.implementation.changefeed.LeaseStoreManager, com.azure.cosmos.implementation.changefeed.LeaseManager
    public Mono<Lease> createLeaseIfNotExist(String str, String str2, Map<String, String> map) {
        if (str == null) {
            throw new IllegalArgumentException("leaseToken");
        }
        ServiceItemLease withProperties = new ServiceItemLease().withId(getDocumentId(str)).withLeaseToken(str).withContinuationToken(str2).withProperties(map);
        return this.leaseDocumentClient.createItem(this.settings.getLeaseCollectionLink(), withProperties, this.requestOptionsFactory.createItemRequestOptions(withProperties), false).onErrorResume(th -> {
            if ((th instanceof CosmosException) && ((CosmosException) th).getStatusCode() == 409) {
                this.logger.info("Some other host created lease for {}.", str);
                return Mono.empty();
            }
            this.logger.error("Failed to create lease document for {}.", str, th);
            return Mono.error(th);
        }).map(cosmosItemResponse -> {
            if (cosmosItemResponse == null) {
                return null;
            }
            InternalObjectNode properties = BridgeInternal.getProperties(cosmosItemResponse);
            return withProperties.withId(properties.getId()).withETag(properties.getETag()).withTs(properties.getString(Constants.Properties.LAST_MODIFIED));
        });
    }

    @Override // com.azure.cosmos.implementation.changefeed.LeaseStoreManager, com.azure.cosmos.implementation.changefeed.LeaseManager
    public Mono<Void> delete(Lease lease) {
        if (lease == null || lease.getId() == null) {
            throw new IllegalArgumentException("lease");
        }
        return this.leaseDocumentClient.deleteItem(lease.getId(), new PartitionKey(lease.getId()), this.requestOptionsFactory.createItemRequestOptions(lease)).onErrorResume(th -> {
            return ((th instanceof CosmosException) && ((CosmosException) th).getStatusCode() == 404) ? Mono.empty() : Mono.error(th);
        }).map(cosmosItemResponse -> {
            return true;
        }).then();
    }

    @Override // com.azure.cosmos.implementation.changefeed.LeaseStoreManager
    public Mono<Void> deleteAll(List<Lease> list) {
        Preconditions.checkNotNull(list, "Argument 'leases' can not be null");
        this.logger.info("Deleting all leases");
        HashMap hashMap = new HashMap();
        for (Lease lease : list) {
            hashMap.put(lease.getId(), new CosmosItemIdentity(new PartitionKey(lease.getId()), lease.getId()));
        }
        return Mono.defer(() -> {
            return Mono.just(hashMap);
        }).flatMapMany(map -> {
            return this.leaseDocumentClient.deleteAllItems((List) hashMap.values().stream().collect(Collectors.toList()));
        }).flatMap(cosmosBulkOperationResponse -> {
            if (cosmosBulkOperationResponse.getResponse() == null || !cosmosBulkOperationResponse.getResponse().isSuccessStatusCode()) {
                int i = 0;
                int i2 = 0;
                if (cosmosBulkOperationResponse.getResponse() != null) {
                    i = cosmosBulkOperationResponse.getResponse().getStatusCode();
                    i2 = cosmosBulkOperationResponse.getResponse().getStatusCode();
                } else if (cosmosBulkOperationResponse.getException() != null && (cosmosBulkOperationResponse.getException() instanceof CosmosException)) {
                    CosmosException cosmosException = (CosmosException) cosmosBulkOperationResponse.getException();
                    i = cosmosException.getStatusCode();
                    i2 = cosmosException.getSubStatusCode();
                }
                if (i == 404 && i2 == 0) {
                    hashMap.remove(cosmosBulkOperationResponse.getOperation().getId());
                }
            } else {
                hashMap.remove(cosmosBulkOperationResponse.getOperation().getId());
            }
            return Mono.empty();
        }).repeat(() -> {
            return hashMap.size() != 0;
        }).then();
    }

    @Override // com.azure.cosmos.implementation.changefeed.LeaseStoreManager, com.azure.cosmos.implementation.changefeed.LeaseManager
    public Mono<Lease> acquire(Lease lease) {
        if (lease == null) {
            throw new IllegalArgumentException("lease");
        }
        String owner = lease.getOwner();
        return this.leaseUpdater.updateLease(lease, lease.getId(), new PartitionKey(lease.getId()), this.requestOptionsFactory.createItemRequestOptions(lease), lease2 -> {
            if (lease2.getOwner() != null && !lease2.getOwner().equalsIgnoreCase(owner)) {
                this.logger.info("Partition {} lease was acquired already by owner '{}'", lease.getLeaseToken(), lease2.getOwner());
                throw new LeaseLostException(lease);
            }
            lease2.setOwner(this.settings.getHostName());
            lease2.setProperties(lease.getProperties());
            return lease2;
        });
    }

    @Override // com.azure.cosmos.implementation.changefeed.LeaseStoreManager, com.azure.cosmos.implementation.changefeed.LeaseManager
    public Mono<Void> release(Lease lease) {
        if (lease == null) {
            throw new IllegalArgumentException("lease");
        }
        return this.leaseDocumentClient.readItem(lease.getId(), new PartitionKey(lease.getId()), this.requestOptionsFactory.createItemRequestOptions(lease), InternalObjectNode.class).onErrorResume(th -> {
            if (!(th instanceof CosmosException) || ((CosmosException) th).getStatusCode() != 404) {
                return Mono.error(th);
            }
            this.logger.info("Partition {} failed to renew lease. The lease is gone already.", lease.getLeaseToken());
            throw new LeaseLostException(lease);
        }).map(cosmosItemResponse -> {
            return ServiceItemLease.fromDocument(BridgeInternal.getProperties(cosmosItemResponse));
        }).flatMap(serviceItemLease -> {
            return this.leaseUpdater.updateLease(serviceItemLease, lease.getId(), new PartitionKey(lease.getId()), this.requestOptionsFactory.createItemRequestOptions(lease), lease2 -> {
                if (lease2.getOwner() == null || lease2.getOwner().equalsIgnoreCase(lease.getOwner())) {
                    lease2.setOwner(null);
                    return lease2;
                }
                this.logger.info("Partition {} no need to release lease. The lease was already taken by another host '{}'.", lease.getLeaseToken(), lease2.getOwner());
                throw new LeaseLostException(lease);
            });
        }).then();
    }

    @Override // com.azure.cosmos.implementation.changefeed.LeaseStoreManager, com.azure.cosmos.implementation.changefeed.LeaseManager
    public Mono<Lease> renew(Lease lease) {
        if (lease == null) {
            throw new IllegalArgumentException("lease");
        }
        return this.leaseDocumentClient.readItem(lease.getId(), new PartitionKey(lease.getId()), this.requestOptionsFactory.createItemRequestOptions(lease), InternalObjectNode.class).onErrorResume(th -> {
            if (!(th instanceof CosmosException) || ((CosmosException) th).getStatusCode() != 404) {
                return Mono.error(th);
            }
            this.logger.info("Partition {} failed to renew lease. The lease is gone already.", lease.getLeaseToken());
            throw new LeaseLostException(lease);
        }).map(cosmosItemResponse -> {
            return ServiceItemLease.fromDocument(BridgeInternal.getProperties(cosmosItemResponse));
        }).flatMap(serviceItemLease -> {
            return this.leaseUpdater.updateLease(serviceItemLease, lease.getId(), new PartitionKey(lease.getId()), this.requestOptionsFactory.createItemRequestOptions(lease), lease2 -> {
                if (lease2.getOwner() == null) {
                    this.logger.info("Partition {} lease was taken over and released by a different owner", lease.getLeaseToken());
                    throw new LeaseLostException(lease);
                }
                if (lease2.getOwner().equalsIgnoreCase(lease.getOwner())) {
                    return lease2;
                }
                this.logger.info("Partition {} lease was taken over by owner '{}'", lease.getLeaseToken(), lease2.getOwner());
                throw new LeaseLostException(lease);
            });
        });
    }

    @Override // com.azure.cosmos.implementation.changefeed.LeaseStoreManager, com.azure.cosmos.implementation.changefeed.LeaseManager
    public Mono<Lease> updateProperties(Lease lease) {
        if (lease == null) {
            throw new IllegalArgumentException("lease");
        }
        if (lease.getOwner() == null || lease.getOwner().equalsIgnoreCase(this.settings.getHostName())) {
            return this.leaseUpdater.updateLease(lease, lease.getId(), new PartitionKey(lease.getId()), this.requestOptionsFactory.createItemRequestOptions(lease), lease2 -> {
                if (lease2.getOwner() == null || lease2.getOwner().equalsIgnoreCase(lease.getOwner())) {
                    lease2.setProperties(lease.getProperties());
                    return lease2;
                }
                this.logger.info("Partition '{}' lease was taken over by owner '{}'", lease.getLeaseToken(), lease2.getOwner());
                throw new LeaseLostException(lease);
            });
        }
        this.logger.info("Partition '{}' lease was taken over by owner '{}' before lease item update", lease.getLeaseToken(), lease.getOwner());
        throw new LeaseLostException(lease);
    }

    @Override // com.azure.cosmos.implementation.changefeed.LeaseStoreManager, com.azure.cosmos.implementation.changefeed.LeaseCheckpointer
    public Mono<Lease> checkpoint(Lease lease, String str, CancellationToken cancellationToken) {
        if (lease == null) {
            throw new IllegalArgumentException("lease");
        }
        if (str == null || str.isEmpty()) {
            throw new IllegalArgumentException("continuationToken must be a non-empty string");
        }
        return cancellationToken.isCancellationRequested() ? Mono.error(new TaskCancelledException()) : this.leaseDocumentClient.readItem(lease.getId(), new PartitionKey(lease.getId()), this.requestOptionsFactory.createItemRequestOptions(lease), InternalObjectNode.class).map(cosmosItemResponse -> {
            return ServiceItemLease.fromDocument(BridgeInternal.getProperties(cosmosItemResponse));
        }).flatMap(serviceItemLease -> {
            return cancellationToken.isCancellationRequested() ? Mono.error(new TaskCancelledException()) : this.leaseUpdater.updateLease(serviceItemLease, lease.getId(), new PartitionKey(lease.getId()), this.requestOptionsFactory.createItemRequestOptions(lease), lease2 -> {
                if (lease2.getOwner() == null) {
                    this.logger.info("Partition {} lease was taken over and released by a different owner", lease.getLeaseToken());
                    throw new LeaseLostException(lease);
                }
                if (lease2.getOwner().equalsIgnoreCase(lease.getOwner())) {
                    lease2.setContinuationToken(str);
                    return lease2;
                }
                this.logger.info("Partition {} lease was taken over by owner '{}'", lease.getLeaseToken(), lease2.getOwner());
                throw new LeaseLostException(lease);
            });
        }).doOnError(th -> {
            this.logger.info("Partition {} lease with token '{}' failed to checkpoint for owner '{}' with continuation token '{}'", new Object[]{lease.getLeaseToken(), lease.getConcurrencyToken(), lease.getOwner(), lease.getReadableContinuationToken()});
        });
    }

    @Override // com.azure.cosmos.implementation.changefeed.LeaseStoreManager, com.azure.cosmos.implementation.changefeed.LeaseStore
    public Mono<Boolean> isInitialized() {
        return this.leaseStore.isInitialized();
    }

    @Override // com.azure.cosmos.implementation.changefeed.LeaseStoreManager, com.azure.cosmos.implementation.changefeed.LeaseStore
    public Mono<Boolean> markInitialized() {
        return this.leaseStore.markInitialized();
    }

    @Override // com.azure.cosmos.implementation.changefeed.LeaseStoreManager, com.azure.cosmos.implementation.changefeed.LeaseStore
    public Mono<Boolean> acquireInitializationLock(Duration duration) {
        return this.leaseStore.acquireInitializationLock(duration);
    }

    @Override // com.azure.cosmos.implementation.changefeed.LeaseStoreManager, com.azure.cosmos.implementation.changefeed.LeaseStore
    public Mono<Boolean> releaseInitializationLock() {
        return this.leaseStore.releaseInitializationLock();
    }

    private Flux<ServiceItemLease> listDocuments(String str) {
        return listDocuments(str, null);
    }

    private Flux<ServiceItemLease> listDocuments(String str, Integer num) {
        SqlQuerySpec sqlQuerySpec;
        if (str == null || str.isEmpty()) {
            throw new IllegalArgumentException("prefix cannot be null or empty!");
        }
        if (num == null) {
            SqlParameter sqlParameter = new SqlParameter();
            sqlParameter.setName("@PartitionLeasePrefix");
            sqlParameter.setValue(str);
            sqlQuerySpec = new SqlQuerySpec("SELECT * FROM c WHERE STARTSWITH(c.id, @PartitionLeasePrefix)", (List<SqlParameter>) Collections.singletonList(sqlParameter));
        } else {
            SqlParameter sqlParameter2 = new SqlParameter();
            sqlParameter2.setName("@Top");
            sqlParameter2.setValue(num);
            SqlParameter sqlParameter3 = new SqlParameter();
            sqlParameter3.setName("@PartitionLeasePrefix");
            sqlParameter3.setValue(str);
            sqlQuerySpec = new SqlQuerySpec("SELECT TOP @Top * FROM c WHERE STARTSWITH(c.id, @PartitionLeasePrefix) AND c.ContinuationToken <> null", (List<SqlParameter>) Arrays.asList(sqlParameter2, sqlParameter3));
        }
        return this.leaseDocumentClient.queryItems(this.settings.getLeaseCollectionLink(), sqlQuerySpec, this.requestOptionsFactory.createQueryRequestOptions(), InternalObjectNode.class).flatMap(feedResponse -> {
            return Flux.fromIterable(feedResponse.getResults());
        }).map(ServiceItemLease::fromDocument);
    }

    private String getDocumentId(String str) {
        return getPartitionLeasePrefix() + str;
    }

    private String getPartitionLeasePrefix() {
        return this.settings.getContainerNamePrefix() + "..";
    }
}
