package com.azure.spring.data.cosmos.core;

import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosAsyncDatabase;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosContainerResponse;
import com.azure.cosmos.models.CosmosDatabaseResponse;
import com.azure.cosmos.models.CosmosItemRequestOptions;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.SqlParameter;
import com.azure.cosmos.models.SqlQuerySpec;
import com.azure.cosmos.models.ThroughputProperties;
import com.azure.cosmos.models.UniqueKeyPolicy;
import com.azure.spring.data.cosmos.Constants;
import com.azure.spring.data.cosmos.CosmosFactory;
import com.azure.spring.data.cosmos.common.CosmosUtils;
import com.azure.spring.data.cosmos.config.CosmosConfig;
import com.azure.spring.data.cosmos.config.DatabaseThroughputConfig;
import com.azure.spring.data.cosmos.core.convert.MappingCosmosConverter;
import com.azure.spring.data.cosmos.core.generator.CountQueryGenerator;
import com.azure.spring.data.cosmos.core.generator.FindQuerySpecGenerator;
import com.azure.spring.data.cosmos.core.generator.NativeQueryGenerator;
import com.azure.spring.data.cosmos.core.mapping.event.AfterLoadEvent;
import com.azure.spring.data.cosmos.core.mapping.event.CosmosMappingEvent;
import com.azure.spring.data.cosmos.core.query.CosmosQuery;
import com.azure.spring.data.cosmos.core.query.Criteria;
import com.azure.spring.data.cosmos.core.query.CriteriaType;
import com.azure.spring.data.cosmos.exception.CosmosExceptionUtils;
import com.azure.spring.data.cosmos.repository.support.CosmosEntityInformation;
import com.fasterxml.jackson.databind.JsonNode;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.data.auditing.IsNewAwareAuditingHandler;
import org.springframework.data.domain.Sort;
import org.springframework.lang.NonNull;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:com/azure/spring/data/cosmos/core/ReactiveCosmosTemplate.class */
public class ReactiveCosmosTemplate implements ReactiveCosmosOperations, ApplicationContextAware {
    private static final Logger LOGGER = LoggerFactory.getLogger(ReactiveCosmosTemplate.class);
    private final MappingCosmosConverter mappingCosmosConverter;
    private final String databaseName;
    private final ResponseDiagnosticsProcessor responseDiagnosticsProcessor;
    private final boolean queryMetricsEnabled;
    private final int maxDegreeOfParallelism;
    private final CosmosAsyncClient cosmosAsyncClient;
    private final IsNewAwareAuditingHandler cosmosAuditingHandler;
    private final DatabaseThroughputConfig databaseThroughputConfig;
    private ApplicationContext applicationContext;

    public ReactiveCosmosTemplate(CosmosAsyncClient cosmosAsyncClient, String str, CosmosConfig cosmosConfig, MappingCosmosConverter mappingCosmosConverter, IsNewAwareAuditingHandler isNewAwareAuditingHandler) {
        this(new CosmosFactory(cosmosAsyncClient, str), cosmosConfig, mappingCosmosConverter, isNewAwareAuditingHandler);
    }

    public ReactiveCosmosTemplate(CosmosAsyncClient cosmosAsyncClient, String str, CosmosConfig cosmosConfig, MappingCosmosConverter mappingCosmosConverter) {
        this(new CosmosFactory(cosmosAsyncClient, str), cosmosConfig, mappingCosmosConverter, (IsNewAwareAuditingHandler) null);
    }

    public ReactiveCosmosTemplate(CosmosFactory cosmosFactory, CosmosConfig cosmosConfig, MappingCosmosConverter mappingCosmosConverter, IsNewAwareAuditingHandler isNewAwareAuditingHandler) {
        Assert.notNull(cosmosFactory, "CosmosFactory must not be null!");
        Assert.notNull(cosmosConfig, "CosmosConfig must not be null!");
        Assert.notNull(mappingCosmosConverter, "MappingCosmosConverter must not be null!");
        this.mappingCosmosConverter = mappingCosmosConverter;
        this.cosmosAsyncClient = cosmosFactory.getCosmosAsyncClient();
        this.databaseName = cosmosFactory.getDatabaseName();
        this.responseDiagnosticsProcessor = cosmosConfig.getResponseDiagnosticsProcessor();
        this.queryMetricsEnabled = cosmosConfig.isQueryMetricsEnabled();
        this.maxDegreeOfParallelism = cosmosConfig.getMaxDegreeOfParallelism();
        this.cosmosAuditingHandler = isNewAwareAuditingHandler;
        this.databaseThroughputConfig = cosmosConfig.getDatabaseThroughputConfig();
    }

    public ReactiveCosmosTemplate(CosmosFactory cosmosFactory, CosmosConfig cosmosConfig, MappingCosmosConverter mappingCosmosConverter) {
        this(cosmosFactory, cosmosConfig, mappingCosmosConverter, (IsNewAwareAuditingHandler) null);
    }

    public void setApplicationContext(@NonNull ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    @Override // com.azure.spring.data.cosmos.core.ReactiveCosmosOperations
    public Mono<CosmosContainerResponse> createContainerIfNotExists(CosmosEntityInformation<?, ?> cosmosEntityInformation) {
        return createDatabaseIfNotExists().publishOn(Schedulers.parallel()).onErrorResume(th -> {
            return CosmosExceptionUtils.exceptionHandler("Failed to create database", th, this.responseDiagnosticsProcessor);
        }).flatMap(cosmosDatabaseResponse -> {
            Mono createContainerIfNotExists;
            CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor, cosmosDatabaseResponse.getDiagnostics(), null);
            CosmosContainerProperties cosmosContainerProperties = new CosmosContainerProperties(cosmosEntityInformation.getContainerName(), cosmosEntityInformation.getPartitionKeyPath());
            cosmosContainerProperties.setDefaultTimeToLiveInSeconds(cosmosEntityInformation.getTimeToLive());
            cosmosContainerProperties.setIndexingPolicy(cosmosEntityInformation.getIndexingPolicy());
            UniqueKeyPolicy uniqueKeyPolicy = cosmosEntityInformation.getUniqueKeyPolicy();
            if (uniqueKeyPolicy != null) {
                cosmosContainerProperties.setUniqueKeyPolicy(uniqueKeyPolicy);
            }
            CosmosAsyncDatabase database = this.cosmosAsyncClient.getDatabase(cosmosDatabaseResponse.getProperties().getId());
            if (cosmosEntityInformation.getRequestUnit() == null) {
                createContainerIfNotExists = database.createContainerIfNotExists(cosmosContainerProperties);
            } else {
                createContainerIfNotExists = database.createContainerIfNotExists(cosmosContainerProperties, cosmosEntityInformation.isAutoScale() ? ThroughputProperties.createAutoscaledThroughput(cosmosEntityInformation.getRequestUnit().intValue()) : ThroughputProperties.createManualThroughput(cosmosEntityInformation.getRequestUnit().intValue()));
            }
            return createContainerIfNotExists.map(cosmosContainerResponse -> {
                CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor, cosmosContainerResponse.getDiagnostics(), null);
                return cosmosContainerResponse;
            }).onErrorResume(th2 -> {
                return CosmosExceptionUtils.exceptionHandler("Failed to create container", th2, this.responseDiagnosticsProcessor);
            });
        });
    }

    private Mono<CosmosDatabaseResponse> createDatabaseIfNotExists() {
        if (this.databaseThroughputConfig == null) {
            return this.cosmosAsyncClient.createDatabaseIfNotExists(this.databaseName);
        }
        return this.cosmosAsyncClient.createDatabaseIfNotExists(this.databaseName, this.databaseThroughputConfig.isAutoScale() ? ThroughputProperties.createAutoscaledThroughput(this.databaseThroughputConfig.getRequestUnits()) : ThroughputProperties.createManualThroughput(this.databaseThroughputConfig.getRequestUnits()));
    }

    @Override // com.azure.spring.data.cosmos.core.ReactiveCosmosOperations
    public Mono<CosmosContainerProperties> getContainerProperties(String str) {
        return this.cosmosAsyncClient.getDatabase(this.databaseName).getContainer(str).read().map((v0) -> {
            return v0.getProperties();
        });
    }

    @Override // com.azure.spring.data.cosmos.core.ReactiveCosmosOperations
    public Mono<CosmosContainerProperties> replaceContainerProperties(String str, CosmosContainerProperties cosmosContainerProperties) {
        return this.cosmosAsyncClient.getDatabase(this.databaseName).getContainer(str).replace(cosmosContainerProperties).map((v0) -> {
            return v0.getProperties();
        });
    }

    @Override // com.azure.spring.data.cosmos.core.ReactiveCosmosOperations
    public <T> Flux<T> findAll(String str, Class<T> cls) {
        return find(new CosmosQuery(Criteria.getInstance(CriteriaType.ALL)), cls, str);
    }

    @Override // com.azure.spring.data.cosmos.core.ReactiveCosmosOperations
    public <T> Flux<T> findAll(Class<T> cls) {
        return findAll(cls.getSimpleName(), cls);
    }

    @Override // com.azure.spring.data.cosmos.core.ReactiveCosmosOperations
    public <T> Flux<T> findAll(PartitionKey partitionKey, Class<T> cls) {
        Assert.notNull(partitionKey, "partitionKey should not be null");
        Assert.notNull(cls, "domainType should not be null");
        String containerName = getContainerName(cls);
        CosmosQueryRequestOptions cosmosQueryRequestOptions = new CosmosQueryRequestOptions();
        cosmosQueryRequestOptions.setPartitionKey(partitionKey);
        cosmosQueryRequestOptions.setQueryMetricsEnabled(this.queryMetricsEnabled);
        cosmosQueryRequestOptions.setMaxDegreeOfParallelism(this.maxDegreeOfParallelism);
        return this.cosmosAsyncClient.getDatabase(this.databaseName).getContainer(containerName).queryItems("SELECT * FROM r", cosmosQueryRequestOptions, JsonNode.class).byPage().publishOn(Schedulers.parallel()).flatMap(feedResponse -> {
            CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor, feedResponse.getCosmosDiagnostics(), feedResponse);
            return Flux.fromIterable(feedResponse.getResults());
        }).map(jsonNode -> {
            return emitOnLoadEventAndConvertToDomainObject(cls, jsonNode);
        }).onErrorResume(th -> {
            return CosmosExceptionUtils.exceptionHandler("Failed to find items", th, this.responseDiagnosticsProcessor);
        });
    }

    @Override // com.azure.spring.data.cosmos.core.ReactiveCosmosOperations
    public <T> Mono<T> findById(Object obj, Class<T> cls) {
        Assert.notNull(cls, "domainType should not be null");
        return findById(getContainerName(cls), obj, cls);
    }

    @Override // com.azure.spring.data.cosmos.core.ReactiveCosmosOperations
    public <T> Mono<T> findById(String str, Object obj, Class<T> cls) {
        Assert.hasText(str, "containerName should not be null, empty or only whitespaces");
        Assert.notNull(cls, "domainType should not be null");
        SqlQuerySpec sqlQuerySpec = new SqlQuerySpec("select * from root where root.id = @ROOT_ID", new SqlParameter[]{new SqlParameter("@ROOT_ID", CosmosUtils.getStringIDValue(obj))});
        CosmosQueryRequestOptions cosmosQueryRequestOptions = new CosmosQueryRequestOptions();
        cosmosQueryRequestOptions.setQueryMetricsEnabled(this.queryMetricsEnabled);
        cosmosQueryRequestOptions.setMaxDegreeOfParallelism(this.maxDegreeOfParallelism);
        return this.cosmosAsyncClient.getDatabase(this.databaseName).getContainer(str).queryItems(sqlQuerySpec, cosmosQueryRequestOptions, JsonNode.class).byPage().publishOn(Schedulers.parallel()).flatMap(feedResponse -> {
            CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor, feedResponse.getCosmosDiagnostics(), feedResponse);
            return Mono.justOrEmpty(feedResponse.getResults().stream().map(jsonNode -> {
                return emitOnLoadEventAndConvertToDomainObject(cls, jsonNode);
            }).findFirst());
        }).onErrorResume(th -> {
            return CosmosExceptionUtils.findAPIExceptionHandler("Failed to find item", th, this.responseDiagnosticsProcessor);
        }).next();
    }

    @Override // com.azure.spring.data.cosmos.core.ReactiveCosmosOperations
    public <T> Mono<T> findById(Object obj, Class<T> cls, PartitionKey partitionKey) {
        Assert.notNull(cls, "domainType should not be null");
        return this.cosmosAsyncClient.getDatabase(this.databaseName).getContainer(getContainerName(cls)).readItem(CosmosUtils.getStringIDValue(obj), partitionKey, JsonNode.class).publishOn(Schedulers.parallel()).flatMap(cosmosItemResponse -> {
            CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor, cosmosItemResponse.getDiagnostics(), null);
            return Mono.justOrEmpty(emitOnLoadEventAndConvertToDomainObject(cls, (JsonNode) cosmosItemResponse.getItem()));
        }).onErrorResume(th -> {
            return CosmosExceptionUtils.findAPIExceptionHandler("Failed to find item", th, this.responseDiagnosticsProcessor);
        });
    }

    @Override // com.azure.spring.data.cosmos.core.ReactiveCosmosOperations
    public <T> Mono<T> insert(T t, PartitionKey partitionKey) {
        return insert(getContainerName(t.getClass()), t, partitionKey);
    }

    public <T> Mono<T> insert(T t) {
        return insert(getContainerName(t.getClass()), t, null);
    }

    @Override // com.azure.spring.data.cosmos.core.ReactiveCosmosOperations
    public <T> Mono<T> insert(String str, Object obj, PartitionKey partitionKey) {
        Assert.hasText(str, "containerName should not be null, empty or only whitespaces");
        Assert.notNull(obj, "objectToSave should not be null");
        Class<?> cls = obj.getClass();
        markAuditedIfConfigured(obj);
        generateIdIfNullAndAutoGenerationEnabled(obj, cls);
        return this.cosmosAsyncClient.getDatabase(this.databaseName).getContainer(str).createItem(this.mappingCosmosConverter.writeJsonNode(obj), partitionKey, new CosmosItemRequestOptions()).publishOn(Schedulers.parallel()).onErrorResume(th -> {
            return CosmosExceptionUtils.exceptionHandler("Failed to insert item", th, this.responseDiagnosticsProcessor);
        }).flatMap(cosmosItemResponse -> {
            CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor, cosmosItemResponse.getDiagnostics(), null);
            return Mono.just(toDomainObject(cls, (JsonNode) cosmosItemResponse.getItem()));
        });
    }

    @Override // com.azure.spring.data.cosmos.core.ReactiveCosmosOperations
    public <T> Mono<T> insert(String str, T t) {
        return insert(str, t, null);
    }

    private <T> void generateIdIfNullAndAutoGenerationEnabled(T t, Class<?> cls) {
        CosmosEntityInformation<?, ?> cosmosEntityInformation = CosmosEntityInformation.getInstance(cls);
        if (cosmosEntityInformation.shouldGenerateId() && ReflectionUtils.getField(cosmosEntityInformation.getIdField(), t) == null) {
            ReflectionUtils.setField(cosmosEntityInformation.getIdField(), t, UUID.randomUUID().toString());
        }
    }

    @Override // com.azure.spring.data.cosmos.core.ReactiveCosmosOperations
    public <T> Mono<T> upsert(T t) {
        return upsert(getContainerName(t.getClass()), t);
    }

    @Override // com.azure.spring.data.cosmos.core.ReactiveCosmosOperations
    public <T> Mono<T> upsert(String str, T t) {
        Class<?> cls = t.getClass();
        markAuditedIfConfigured(t);
        JsonNode writeJsonNode = this.mappingCosmosConverter.writeJsonNode(t);
        CosmosItemRequestOptions cosmosItemRequestOptions = new CosmosItemRequestOptions();
        applyVersioning(t.getClass(), writeJsonNode, cosmosItemRequestOptions);
        return this.cosmosAsyncClient.getDatabase(this.databaseName).getContainer(str).upsertItem(writeJsonNode, cosmosItemRequestOptions).publishOn(Schedulers.parallel()).flatMap(cosmosItemResponse -> {
            CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor, cosmosItemResponse.getDiagnostics(), null);
            return Mono.just(toDomainObject(cls, (JsonNode) cosmosItemResponse.getItem()));
        }).onErrorResume(th -> {
            return CosmosExceptionUtils.exceptionHandler("Failed to upsert item", th, this.responseDiagnosticsProcessor);
        });
    }

    @Override // com.azure.spring.data.cosmos.core.ReactiveCosmosOperations
    public Mono<Void> deleteById(String str, Object obj, PartitionKey partitionKey) {
        return deleteById(str, obj, partitionKey, new CosmosItemRequestOptions());
    }

    private Mono<Void> deleteById(String str, Object obj, PartitionKey partitionKey, CosmosItemRequestOptions cosmosItemRequestOptions) {
        Assert.hasText(str, "container name should not be null, empty or only whitespaces");
        String stringIDValue = CosmosUtils.getStringIDValue(obj);
        if (partitionKey == null) {
            partitionKey = PartitionKey.NONE;
        }
        return this.cosmosAsyncClient.getDatabase(this.databaseName).getContainer(str).deleteItem(stringIDValue, partitionKey, cosmosItemRequestOptions).publishOn(Schedulers.parallel()).doOnNext(cosmosItemResponse -> {
            CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor, cosmosItemResponse.getDiagnostics(), null);
        }).onErrorResume(th -> {
            return CosmosExceptionUtils.exceptionHandler("Failed to delete item", th, this.responseDiagnosticsProcessor);
        }).then();
    }

    @Override // com.azure.spring.data.cosmos.core.ReactiveCosmosOperations
    public <T> Mono<Void> deleteEntity(String str, T t) {
        Assert.notNull(t, "entity to be deleted should not be null");
        Class<?> cls = t.getClass();
        JsonNode writeJsonNode = this.mappingCosmosConverter.writeJsonNode(t);
        applyVersioning(t.getClass(), writeJsonNode, new CosmosItemRequestOptions());
        return deleteItem(writeJsonNode, str, cls).then();
    }

    @Override // com.azure.spring.data.cosmos.core.ReactiveCosmosOperations
    public Mono<Void> deleteAll(@NonNull String str, @NonNull Class<?> cls) {
        Assert.hasText(str, "container name should not be null, empty or only whitespaces");
        return delete(new CosmosQuery(Criteria.getInstance(CriteriaType.ALL)), cls, str).then();
    }

    @Override // com.azure.spring.data.cosmos.core.ReactiveCosmosOperations
    public <T> Flux<T> delete(CosmosQuery cosmosQuery, Class<T> cls, String str) {
        Assert.notNull(cosmosQuery, "DocumentQuery should not be null.");
        Assert.notNull(cls, "domainType should not be null.");
        Assert.hasText(str, "container name should not be null, empty or only whitespaces");
        return findItems(cosmosQuery, str, cls).flatMap(jsonNode -> {
            return deleteItem(jsonNode, str, cls);
        });
    }

    @Override // com.azure.spring.data.cosmos.core.ReactiveCosmosOperations
    public <T> Flux<T> find(CosmosQuery cosmosQuery, Class<T> cls, String str) {
        return findItems(cosmosQuery, str, cls).map(jsonNode -> {
            return emitOnLoadEventAndConvertToDomainObject(cls, jsonNode);
        });
    }

    @Override // com.azure.spring.data.cosmos.core.ReactiveCosmosOperations
    public Mono<Boolean> exists(CosmosQuery cosmosQuery, Class<?> cls, String str) {
        return count(cosmosQuery, str).flatMap(l -> {
            return Mono.just(Boolean.valueOf(l.longValue() > 0));
        });
    }

    @Override // com.azure.spring.data.cosmos.core.ReactiveCosmosOperations
    public Mono<Boolean> existsById(Object obj, Class<?> cls, String str) {
        return findById(str, obj, cls).flatMap(obj2 -> {
            return Mono.just(Boolean.valueOf(obj2 != null));
        });
    }

    @Override // com.azure.spring.data.cosmos.core.ReactiveCosmosOperations
    public Mono<Long> count(String str) {
        return count(new CosmosQuery(Criteria.getInstance(CriteriaType.ALL)), str);
    }

    @Override // com.azure.spring.data.cosmos.core.ReactiveCosmosOperations
    public Mono<Long> count(CosmosQuery cosmosQuery, String str) {
        return getCountValue(new CountQueryGenerator().generateCosmos(cosmosQuery), str);
    }

    @Override // com.azure.spring.data.cosmos.core.ReactiveCosmosOperations
    public Mono<Long> count(SqlQuerySpec sqlQuerySpec, String str) {
        return getCountValue(sqlQuerySpec, str);
    }

    @Override // com.azure.spring.data.cosmos.core.ReactiveCosmosOperations
    public MappingCosmosConverter getConverter() {
        return this.mappingCosmosConverter;
    }

    @Override // com.azure.spring.data.cosmos.core.ReactiveCosmosOperations
    public <T> Flux<T> runQuery(SqlQuerySpec sqlQuerySpec, Class<?> cls, Class<T> cls2) {
        return runQuery(sqlQuerySpec, Sort.unsorted(), cls, cls2);
    }

    @Override // com.azure.spring.data.cosmos.core.ReactiveCosmosOperations
    public <T> Flux<T> runQuery(SqlQuerySpec sqlQuerySpec, Sort sort, Class<?> cls, Class<T> cls2) {
        return runQuery(NativeQueryGenerator.getInstance().generateSortedQuery(sqlQuerySpec, sort), cls).map(jsonNode -> {
            return emitOnLoadEventAndConvertToDomainObject(cls2, jsonNode);
        });
    }

    private Flux<JsonNode> runQuery(SqlQuerySpec sqlQuerySpec, Class<?> cls) {
        String containerName = getContainerName(cls);
        CosmosQueryRequestOptions cosmosQueryRequestOptions = new CosmosQueryRequestOptions();
        cosmosQueryRequestOptions.setMaxDegreeOfParallelism(this.maxDegreeOfParallelism);
        return this.cosmosAsyncClient.getDatabase(this.databaseName).getContainer(containerName).queryItems(sqlQuerySpec, cosmosQueryRequestOptions, JsonNode.class).byPage().publishOn(Schedulers.parallel()).flatMap(feedResponse -> {
            CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor, feedResponse.getCosmosDiagnostics(), feedResponse);
            return Flux.fromIterable(feedResponse.getResults());
        }).onErrorResume(th -> {
            return CosmosExceptionUtils.exceptionHandler("Failed to find items", th, this.responseDiagnosticsProcessor);
        });
    }

    private Mono<Long> getCountValue(SqlQuerySpec sqlQuerySpec, String str) {
        CosmosQueryRequestOptions cosmosQueryRequestOptions = new CosmosQueryRequestOptions();
        cosmosQueryRequestOptions.setQueryMetricsEnabled(this.queryMetricsEnabled);
        cosmosQueryRequestOptions.setMaxDegreeOfParallelism(this.maxDegreeOfParallelism);
        return executeQuery(sqlQuerySpec, str, cosmosQueryRequestOptions).doOnNext(feedResponse -> {
            CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor, feedResponse.getCosmosDiagnostics(), feedResponse);
        }).onErrorResume(th -> {
            return CosmosExceptionUtils.exceptionHandler("Failed to get count value", th, this.responseDiagnosticsProcessor);
        }).next().map(feedResponse2 -> {
            return Long.valueOf(((JsonNode) feedResponse2.getResults().get(0)).asLong());
        });
    }

    private Flux<FeedResponse<JsonNode>> executeQuery(SqlQuerySpec sqlQuerySpec, String str, CosmosQueryRequestOptions cosmosQueryRequestOptions) {
        return this.cosmosAsyncClient.getDatabase(this.databaseName).getContainer(str).queryItems(sqlQuerySpec, cosmosQueryRequestOptions, JsonNode.class).byPage().onErrorResume(th -> {
            return CosmosExceptionUtils.exceptionHandler("Failed to execute query", th, this.responseDiagnosticsProcessor);
        });
    }

    @Override // com.azure.spring.data.cosmos.core.ReactiveCosmosOperations
    public void deleteContainer(@NonNull String str) {
        Assert.hasText(str, "containerName should have text.");
        this.cosmosAsyncClient.getDatabase(this.databaseName).getContainer(str).delete().doOnNext(cosmosContainerResponse -> {
            CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor, cosmosContainerResponse.getDiagnostics(), null);
        }).onErrorResume(th -> {
            return CosmosExceptionUtils.exceptionHandler("Failed to delete container", th, this.responseDiagnosticsProcessor);
        }).block();
    }

    @Override // com.azure.spring.data.cosmos.core.ReactiveCosmosOperations
    public String getContainerName(Class<?> cls) {
        Assert.notNull(cls, "domainType should not be null");
        return CosmosEntityInformation.getInstance(cls).getContainerName();
    }

    private void markAuditedIfConfigured(Object obj) {
        if (this.cosmosAuditingHandler != null) {
            this.cosmosAuditingHandler.markAudited(obj);
        }
    }

    private <T> Flux<JsonNode> findItems(@NonNull CosmosQuery cosmosQuery, @NonNull String str, @NonNull Class<T> cls) {
        SqlQuerySpec generateCosmos = new FindQuerySpecGenerator().generateCosmos(cosmosQuery);
        CosmosQueryRequestOptions cosmosQueryRequestOptions = new CosmosQueryRequestOptions();
        cosmosQueryRequestOptions.setQueryMetricsEnabled(this.queryMetricsEnabled);
        cosmosQueryRequestOptions.setMaxDegreeOfParallelism(this.maxDegreeOfParallelism);
        cosmosQuery.getPartitionKeyValue(cls).ifPresent(obj -> {
            LOGGER.debug("Setting partition key {}", obj);
            cosmosQueryRequestOptions.setPartitionKey(new PartitionKey(obj));
        });
        return this.cosmosAsyncClient.getDatabase(this.databaseName).getContainer(str).queryItems(generateCosmos, cosmosQueryRequestOptions, JsonNode.class).byPage().publishOn(Schedulers.parallel()).flatMap(feedResponse -> {
            CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor, feedResponse.getCosmosDiagnostics(), feedResponse);
            return Flux.fromIterable(feedResponse.getResults());
        }).onErrorResume(th -> {
            return CosmosExceptionUtils.exceptionHandler("Failed to query items", th, this.responseDiagnosticsProcessor);
        });
    }

    private <T> Mono<T> deleteItem(@NonNull JsonNode jsonNode, String str, @NonNull Class<T> cls) {
        CosmosItemRequestOptions cosmosItemRequestOptions = new CosmosItemRequestOptions();
        applyVersioning(cls, jsonNode, cosmosItemRequestOptions);
        return this.cosmosAsyncClient.getDatabase(this.databaseName).getContainer(str).deleteItem(jsonNode, cosmosItemRequestOptions).publishOn(Schedulers.parallel()).map(cosmosItemResponse -> {
            CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor, cosmosItemResponse.getDiagnostics(), null);
            return cosmosItemResponse;
        }).flatMap(cosmosItemResponse2 -> {
            return Mono.just(toDomainObject(cls, jsonNode));
        }).onErrorResume(th -> {
            return CosmosExceptionUtils.exceptionHandler("Failed to delete item", th, this.responseDiagnosticsProcessor);
        });
    }

    private <T> T emitOnLoadEventAndConvertToDomainObject(@NonNull Class<T> cls, JsonNode jsonNode) {
        maybeEmitEvent(new AfterLoadEvent(jsonNode, cls, CosmosEntityInformation.getInstance(cls).getContainerName()));
        return (T) toDomainObject(cls, jsonNode);
    }

    private <T> T toDomainObject(@NonNull Class<T> cls, JsonNode jsonNode) {
        return (T) this.mappingCosmosConverter.read((Class) cls, jsonNode);
    }

    private void applyVersioning(Class<?> cls, JsonNode jsonNode, CosmosItemRequestOptions cosmosItemRequestOptions) {
        if (CosmosEntityInformation.getInstance(cls).isVersioned()) {
            cosmosItemRequestOptions.setIfMatchETag(jsonNode.get(Constants.ETAG_PROPERTY_DEFAULT_NAME).asText());
        }
    }

    private void maybeEmitEvent(CosmosMappingEvent<?> cosmosMappingEvent) {
        if (canPublishEvent()) {
            this.applicationContext.publishEvent(cosmosMappingEvent);
        }
    }

    private boolean canPublishEvent() {
        return this.applicationContext != null;
    }
}
