/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.classic.kv;

import com.couchbase.client.core.Core;
import com.couchbase.client.core.CoreContext;
import com.couchbase.client.core.CoreKeyspace;
import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.api.kv.CoreAsyncResponse;
import com.couchbase.client.core.api.kv.CoreDurability;
import com.couchbase.client.core.api.kv.CoreEncodedContent;
import com.couchbase.client.core.api.kv.CoreExistsResult;
import com.couchbase.client.core.api.kv.CoreExpiry;
import com.couchbase.client.core.api.kv.CoreGetResult;
import com.couchbase.client.core.api.kv.CoreKvOps;
import com.couchbase.client.core.api.kv.CoreKvParamValidators;
import com.couchbase.client.core.api.kv.CoreKvResponseMetadata;
import com.couchbase.client.core.api.kv.CoreMutationResult;
import com.couchbase.client.core.api.kv.CoreReadPreference;
import com.couchbase.client.core.api.kv.CoreStoreSemantics;
import com.couchbase.client.core.api.kv.CoreSubdocGetCommand;
import com.couchbase.client.core.api.kv.CoreSubdocGetResult;
import com.couchbase.client.core.api.kv.CoreSubdocMutateCommand;
import com.couchbase.client.core.api.kv.CoreSubdocMutateResult;
import com.couchbase.client.core.classic.ClassicExpiryHelper;
import com.couchbase.client.core.classic.ClassicHelper;
import com.couchbase.client.core.cnc.CbTracing;
import com.couchbase.client.core.cnc.RequestSpan;
import com.couchbase.client.core.cnc.RequestTracer;
import com.couchbase.client.core.config.BucketConfig;
import com.couchbase.client.core.endpoint.http.CoreCommonOptions;
import com.couchbase.client.core.error.CasMismatchException;
import com.couchbase.client.core.error.CouchbaseException;
import com.couchbase.client.core.error.DefaultErrorUtil;
import com.couchbase.client.core.error.DocumentExistsException;
import com.couchbase.client.core.error.DocumentNotFoundException;
import com.couchbase.client.core.error.DocumentUnretrievableException;
import com.couchbase.client.core.error.InvalidArgumentException;
import com.couchbase.client.core.error.context.KeyValueErrorContext;
import com.couchbase.client.core.error.context.ReducedKeyValueErrorContext;
import com.couchbase.client.core.io.CollectionIdentifier;
import com.couchbase.client.core.kv.CoreRangeScan;
import com.couchbase.client.core.kv.CoreRangeScanItem;
import com.couchbase.client.core.kv.CoreSamplingScan;
import com.couchbase.client.core.kv.CoreScanOptions;
import com.couchbase.client.core.kv.CoreScanType;
import com.couchbase.client.core.kv.RangeScanOrchestrator;
import com.couchbase.client.core.msg.BaseResponse;
import com.couchbase.client.core.msg.CancellationReason;
import com.couchbase.client.core.msg.ResponseStatus;
import com.couchbase.client.core.msg.kv.CodecFlags;
import com.couchbase.client.core.msg.kv.GetAndLockRequest;
import com.couchbase.client.core.msg.kv.GetAndTouchRequest;
import com.couchbase.client.core.msg.kv.GetMetaRequest;
import com.couchbase.client.core.msg.kv.GetRequest;
import com.couchbase.client.core.msg.kv.InsertRequest;
import com.couchbase.client.core.msg.kv.KeyValueRequest;
import com.couchbase.client.core.msg.kv.RemoveRequest;
import com.couchbase.client.core.msg.kv.ReplaceRequest;
import com.couchbase.client.core.msg.kv.SubDocumentField;
import com.couchbase.client.core.msg.kv.SubdocCommandType;
import com.couchbase.client.core.msg.kv.SubdocGetRequest;
import com.couchbase.client.core.msg.kv.SubdocGetResponse;
import com.couchbase.client.core.msg.kv.SubdocMutateRequest;
import com.couchbase.client.core.msg.kv.TouchRequest;
import com.couchbase.client.core.msg.kv.UnlockRequest;
import com.couchbase.client.core.msg.kv.UpsertRequest;
import com.couchbase.client.core.projections.ProjectionsApplier;
import com.couchbase.client.core.retry.RetryStrategy;
import com.couchbase.client.core.service.kv.ReplicaHelper;
import com.couchbase.client.core.util.BucketConfigUtil;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Stability.Internal
public final class ClassicCoreKvOps
implements CoreKvOps {
    private final Core core;
    private final CoreContext ctx;
    private final Duration defaultKvTimeout;
    private final Duration defaultKvDurableTimeout;
    private final RetryStrategy defaultRetryStrategy;
    private final CollectionIdentifier collectionIdentifier;
    private final CoreKeyspace keyspace;
    private final RequestTracer requestTracer;
    private final RangeScanOrchestrator rangeScanOrchestrator;

    public ClassicCoreKvOps(Core core, CoreKeyspace keyspace) {
        this.core = Objects.requireNonNull(core);
        this.ctx = core.context();
        this.defaultKvTimeout = this.ctx.environment().timeoutConfig().kvTimeout();
        this.defaultKvDurableTimeout = this.ctx.environment().timeoutConfig().kvDurableTimeout();
        this.defaultRetryStrategy = this.ctx.environment().retryStrategy();
        this.requestTracer = this.ctx.environment().requestTracer();
        this.keyspace = Objects.requireNonNull(keyspace);
        this.collectionIdentifier = keyspace.toCollectionIdentifier();
        this.rangeScanOrchestrator = new RangeScanOrchestrator(core, this.collectionIdentifier);
    }

    @Override
    public CoreAsyncResponse<CoreGetResult> getAsync(CoreCommonOptions common, String key, List<String> projections, boolean withExpiry) {
        CoreKvParamValidators.validateGetParams(common, key, projections, withExpiry);
        Duration timeout = this.timeout(common);
        RetryStrategy retryStrategy = this.retryStrategy(common);
        if (!withExpiry && projections.isEmpty()) {
            RequestSpan span = this.span(common, "get");
            GetRequest request = new GetRequest(key, timeout, this.ctx, this.collectionIdentifier, retryStrategy, span);
            ClassicHelper.setClientContext(request, common);
            return this.newAsyncResponse(request, it -> new CoreGetResult(CoreKvResponseMetadata.from(it.flexibleExtras()), this.keyspace, key, it.content(), it.flags(), it.cas(), null, false));
        }
        SubdocGetRequest request = this.getWithProjectionsOrExpiryRequest(common, key, projections, withExpiry);
        return this.newAsyncResponse(request, (req, res) -> {
            if (res.status() != ResponseStatus.SUBDOC_FAILURE) {
                throw DefaultErrorUtil.keyValueStatusToException(request, res);
            }
        }, it -> this.parseGetWithProjectionsOrExpiry(key, (SubdocGetResponse)it));
    }

    private SubdocGetRequest getWithProjectionsOrExpiryRequest(CoreCommonOptions common, String key, List<String> projections, boolean withExpiry) {
        CoreKvParamValidators.validateGetParams(common, key, projections, withExpiry);
        this.checkProjectionLimits(projections, withExpiry);
        Duration timeout = this.timeout(common);
        RetryStrategy retryStrategy = this.retryStrategy(common);
        RequestSpan span = this.span(common, "lookup_in");
        ArrayList<SubdocGetRequest.Command> commands = new ArrayList<SubdocGetRequest.Command>(16);
        if (!projections.isEmpty()) {
            for (String projection : projections) {
                commands.add(new SubdocGetRequest.Command(SubdocCommandType.GET, projection, false, commands.size()));
            }
        } else {
            commands.add(new SubdocGetRequest.Command(SubdocCommandType.GET_DOC, "", false, commands.size()));
        }
        if (withExpiry) {
            commands.add(0, new SubdocGetRequest.Command(SubdocCommandType.GET, "$document.exptime", true, commands.size()));
            if (projections.isEmpty()) {
                commands.add(1, new SubdocGetRequest.Command(SubdocCommandType.GET, "$document.flags", true, commands.size()));
            }
        }
        return new SubdocGetRequest(timeout, this.ctx, this.collectionIdentifier, retryStrategy, key, 0, commands, span);
    }

    private CoreGetResult parseGetWithProjectionsOrExpiry(String key, SubdocGetResponse response) {
        long parsed;
        int convertedFlags;
        if (response.error().isPresent()) {
            throw response.error().get();
        }
        if (response.values().length == 1 && response.values()[0].error().isPresent()) {
            throw response.values()[0].error().get();
        }
        long cas = response.cas();
        byte[] exptime = null;
        byte[] content = null;
        byte[] flags = null;
        for (SubDocumentField value : response.values()) {
            if (value == null) continue;
            if ("$document.exptime".equals(value.path())) {
                exptime = value.value();
                continue;
            }
            if ("$document.flags".equals(value.path())) {
                flags = value.value();
                continue;
            }
            if (!value.path().isEmpty()) continue;
            content = value.value();
        }
        int n = convertedFlags = flags == null || flags.length == 0 ? CodecFlags.JSON_COMPAT_FLAGS : Integer.parseInt(new String(flags, StandardCharsets.UTF_8));
        if (content == null) {
            try {
                content = ProjectionsApplier.reconstructDocument(response);
            }
            catch (Exception e) {
                throw new CouchbaseException("Unexpected Exception while decoding Sub-Document get", e);
            }
        }
        Optional<Object> expiration = Optional.empty();
        if (exptime != null && exptime.length > 0 && (parsed = Long.parseLong(new String(exptime, StandardCharsets.UTF_8))) > 0L) {
            expiration = Optional.of(Instant.ofEpochSecond(parsed));
        }
        return new CoreGetResult(CoreKvResponseMetadata.from(response.flexibleExtras()), this.keyspace, key, content, convertedFlags, cas, expiration.orElse(null), false);
    }

    @Override
    public CoreAsyncResponse<CoreGetResult> getAndLockAsync(CoreCommonOptions common, String key, Duration lockTime) {
        CoreKvParamValidators.validateGetAndLockParams(common, key, lockTime);
        Duration timeout = this.timeout(common);
        RetryStrategy retryStrategy = this.retryStrategy(common);
        RequestSpan span = this.span(common, "get_and_lock");
        GetAndLockRequest request = new GetAndLockRequest(key, timeout, this.ctx, this.collectionIdentifier, retryStrategy, lockTime, span);
        ClassicHelper.setClientContext(request, common);
        return this.newAsyncResponse(request, it -> new CoreGetResult(CoreKvResponseMetadata.from(it.flexibleExtras()), this.keyspace, key, it.content(), it.flags(), it.cas(), null, false));
    }

    @Override
    public CoreAsyncResponse<CoreGetResult> getAndTouchAsync(CoreCommonOptions common, String key, CoreExpiry expiry) {
        CoreKvParamValidators.validateGetAndTouchParams(common, key, expiry);
        Duration timeout = this.timeout(common);
        RetryStrategy retryStrategy = this.retryStrategy(common);
        RequestSpan span = this.span(common, "get_and_touch");
        GetAndTouchRequest request = new GetAndTouchRequest(key, timeout, this.ctx, this.collectionIdentifier, retryStrategy, ClassicExpiryHelper.encode(expiry), span);
        ClassicHelper.setClientContext(request, common);
        return this.newAsyncResponse(request, it -> new CoreGetResult(CoreKvResponseMetadata.from(it.flexibleExtras()), this.keyspace, key, it.content(), it.flags(), it.cas(), null, false));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CoreAsyncResponse<CoreMutationResult> insertAsync(CoreCommonOptions common, String key, Supplier<CoreEncodedContent> content, CoreDurability durability, CoreExpiry expiry) {
        CoreEncodedContent coreContent;
        CoreKvParamValidators.validateInsertParams(common, key, content, durability, expiry);
        Duration timeout = this.timeout(common, durability);
        RetryStrategy retryStrategy = this.retryStrategy(common);
        RequestSpan span = this.span(common, "insert");
        RequestSpan encodingSpan = this.span(span, "request_encoding");
        long encodingStartNanos = System.nanoTime();
        try {
            coreContent = content.get();
        }
        finally {
            encodingSpan.end();
        }
        long encodingNanos = System.nanoTime() - encodingStartNanos;
        InsertRequest request = new InsertRequest(key, coreContent.encoded(), ClassicExpiryHelper.encode(expiry), coreContent.flags(), timeout, this.ctx, this.collectionIdentifier, retryStrategy, durability.levelIfSynchronous(), span);
        request.context().clientContext(common.clientContext()).encodeLatency(encodingNanos);
        CompletionStage<CoreMutationResult> future = this.executeWithoutMarkingComplete(request, (req, res) -> {
            if (res.status() == ResponseStatus.EXISTS || res.status() == ResponseStatus.NOT_STORED) {
                throw new DocumentExistsException(KeyValueErrorContext.completedRequest(req, res));
            }
            throw res.errorIfNeeded(request);
        }, it -> new CoreMutationResult(CoreKvResponseMetadata.from(it.flexibleExtras()), this.keyspace, key, it.cas(), it.mutationToken()));
        future = ClassicHelper.maybeWrapWithLegacyDurability(future, key, durability, this.core, request).whenComplete((response, failure) -> ClassicCoreKvOps.markComplete(request, failure));
        return ClassicHelper.newAsyncResponse(request, future);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CoreAsyncResponse<CoreMutationResult> upsertAsync(CoreCommonOptions common, String key, Supplier<CoreEncodedContent> content, CoreDurability durability, CoreExpiry expiry, boolean preserveExpiry) {
        CoreEncodedContent coreContent;
        CoreKvParamValidators.validateUpsertParams(common, key, content, durability, expiry, preserveExpiry);
        Duration timeout = this.timeout(common, durability);
        RetryStrategy retryStrategy = this.retryStrategy(common);
        RequestSpan span = this.span(common, "upsert");
        RequestSpan encodingSpan = this.span(span, "request_encoding");
        long encodingStartNanos = System.nanoTime();
        try {
            coreContent = content.get();
        }
        finally {
            encodingSpan.end();
        }
        long encodingNanos = System.nanoTime() - encodingStartNanos;
        UpsertRequest request = new UpsertRequest(key, coreContent.encoded(), ClassicExpiryHelper.encode(expiry), preserveExpiry, coreContent.flags(), timeout, this.ctx, this.collectionIdentifier, retryStrategy, durability.levelIfSynchronous(), span);
        request.context().clientContext(common.clientContext()).encodeLatency(encodingNanos);
        CompletionStage<CoreMutationResult> future = this.executeWithoutMarkingComplete(request, it -> new CoreMutationResult(CoreKvResponseMetadata.from(it.flexibleExtras()), this.keyspace, key, it.cas(), it.mutationToken()));
        future = ClassicHelper.maybeWrapWithLegacyDurability(future, key, durability, this.core, request).whenComplete((response, failure) -> ClassicCoreKvOps.markComplete(request, failure));
        return ClassicHelper.newAsyncResponse(request, future);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CoreAsyncResponse<CoreMutationResult> replaceAsync(CoreCommonOptions common, String key, Supplier<CoreEncodedContent> content, long cas, CoreDurability durability, CoreExpiry expiry, boolean preserveExpiry) {
        CoreEncodedContent coreContent;
        CoreKvParamValidators.validateReplaceParams(common, key, content, cas, durability, expiry, preserveExpiry);
        Duration timeout = this.timeout(common, durability);
        RetryStrategy retryStrategy = this.retryStrategy(common);
        RequestSpan span = this.span(common, "replace");
        RequestSpan encodingSpan = this.span(span, "request_encoding");
        long encodingStartNanos = System.nanoTime();
        try {
            coreContent = content.get();
        }
        finally {
            encodingSpan.end();
        }
        long encodingNanos = System.nanoTime() - encodingStartNanos;
        ReplaceRequest request = new ReplaceRequest(key, coreContent.encoded(), ClassicExpiryHelper.encode(expiry), preserveExpiry, coreContent.flags(), timeout, cas, this.ctx, this.collectionIdentifier, retryStrategy, durability.levelIfSynchronous(), span);
        request.context().clientContext(common.clientContext()).encodeLatency(encodingNanos);
        CompletionStage<CoreMutationResult> future = this.executeWithoutMarkingComplete(request, it -> new CoreMutationResult(CoreKvResponseMetadata.from(it.flexibleExtras()), this.keyspace, key, it.cas(), it.mutationToken()));
        future = ClassicHelper.maybeWrapWithLegacyDurability(future, key, durability, this.core, request).whenComplete((response, failure) -> ClassicCoreKvOps.markComplete(request, failure));
        return ClassicHelper.newAsyncResponse(request, future);
    }

    @Override
    public CoreAsyncResponse<CoreMutationResult> removeAsync(CoreCommonOptions common, String key, long cas, CoreDurability durability) {
        CoreKvParamValidators.validateRemoveParams(common, key, cas, durability);
        Duration timeout = this.timeout(common, durability);
        RetryStrategy retryStrategy = this.retryStrategy(common);
        RequestSpan span = this.span(common, "remove");
        RemoveRequest request = new RemoveRequest(key, cas, timeout, this.ctx, this.collectionIdentifier, retryStrategy, durability.levelIfSynchronous(), span);
        request.context().clientContext(common.clientContext());
        CompletionStage<CoreMutationResult> future = this.executeWithoutMarkingComplete(request, it -> new CoreMutationResult(CoreKvResponseMetadata.from(it.flexibleExtras()), this.keyspace, key, it.cas(), it.mutationToken()));
        future = ClassicHelper.maybeWrapWithLegacyDurability(future, key, durability, this.core, request).whenComplete((response, failure) -> ClassicCoreKvOps.markComplete(request, failure));
        return ClassicHelper.newAsyncResponse(request, future);
    }

    @Override
    public CoreAsyncResponse<CoreExistsResult> existsAsync(CoreCommonOptions common, String key) {
        CoreKvParamValidators.validateExistsParams(common, key);
        Duration timeout = this.timeout(common);
        RetryStrategy retryStrategy = this.retryStrategy(common);
        RequestSpan span = this.span(common, "exists");
        GetMetaRequest request = new GetMetaRequest(key, timeout, this.ctx, this.collectionIdentifier, retryStrategy, span);
        ClassicHelper.setClientContext(request, common);
        return this.newAsyncResponse(request, (req, res) -> {
            if (res.status() != ResponseStatus.NOT_FOUND) {
                throw DefaultErrorUtil.keyValueStatusToException(req, res);
            }
        }, it -> new CoreExistsResult(CoreKvResponseMetadata.from(it.flexibleExtras()), this.keyspace, key, it.cas(), it.status().success() && !it.deleted()));
    }

    @Override
    public CoreAsyncResponse<CoreMutationResult> touchAsync(CoreCommonOptions common, String key, CoreExpiry expiry) {
        CoreKvParamValidators.validateTouchParams(common, key, expiry);
        Duration timeout = this.timeout(common);
        RetryStrategy retryStrategy = this.retryStrategy(common);
        RequestSpan span = this.span(common, "touch");
        TouchRequest request = new TouchRequest(timeout, this.ctx, this.collectionIdentifier, retryStrategy, key, ClassicExpiryHelper.encode(expiry), span);
        ClassicHelper.setClientContext(request, common);
        return this.newAsyncResponse(request, it -> new CoreMutationResult(CoreKvResponseMetadata.from(it.flexibleExtras()), this.keyspace, key, it.cas(), it.mutationToken()));
    }

    @Override
    public CoreAsyncResponse<Void> unlockAsync(CoreCommonOptions common, String key, long cas) {
        CoreKvParamValidators.validateUnlockParams(common, key, cas, this.collectionIdentifier);
        Duration timeout = this.timeout(common);
        RetryStrategy retryStrategy = this.retryStrategy(common);
        RequestSpan span = this.span(common, "unlock");
        UnlockRequest request = new UnlockRequest(timeout, this.ctx, this.collectionIdentifier, retryStrategy, key, cas, span);
        ClassicHelper.setClientContext(request, common);
        return this.newAsyncResponse(request, (req, res) -> {
            if (res.status() == ResponseStatus.LOCKED) {
                throw new CasMismatchException(KeyValueErrorContext.completedRequest(req, res));
            }
            throw DefaultErrorUtil.keyValueStatusToException(req, res);
        }, it -> null);
    }

    @Override
    public CoreAsyncResponse<CoreSubdocGetResult> subdocGetAsync(CoreCommonOptions common, String key, List<CoreSubdocGetCommand> commands, boolean accessDeleted) {
        CoreKvParamValidators.validateSubdocGetParams(common, key, commands);
        Duration timeout = this.timeout(common);
        RetryStrategy retryStrategy = this.retryStrategy(common);
        RequestSpan span = this.span(common, "lookup_in");
        byte flags = accessDeleted ? (byte)4 : 0;
        SubdocGetRequest request = SubdocGetRequest.create(timeout, this.ctx, this.collectionIdentifier, retryStrategy, key, flags, commands, span);
        request.context().clientContext(common.clientContext());
        return this.newAsyncResponse(request, (req, res) -> {
            if (res.error().isPresent()) {
                throw res.error().get();
            }
            if (res.status() == ResponseStatus.SUBDOC_FAILURE) {
                return;
            }
            ClassicCoreKvOps.commonKvResponseCheck(req, res);
        }, it -> it.toCore(this.keyspace, key));
    }

    @Override
    public Flux<CoreGetResult> getAllReplicasReactive(CoreCommonOptions common, String key, CoreReadPreference readPreference) {
        CoreKvParamValidators.validateGetAllReplicasParams(common, key);
        Duration timeout = this.timeout(common);
        RetryStrategy retryStrategy = this.retryStrategy(common);
        return ReplicaHelper.getAllReplicasReactive(this.core, this.collectionIdentifier, key, timeout, retryStrategy, common.clientContext(), common.parentSpan().orElse(null), readPreference).map(it -> new CoreGetResult(CoreKvResponseMetadata.from(it.getResponse().flexibleExtras()), this.keyspace, key, it.getResponse().content(), it.getResponse().flags(), it.getResponse().cas(), null, it.isFromReplica()));
    }

    @Override
    public Mono<CoreGetResult> getAnyReplicaReactive(CoreCommonOptions common, String key, CoreReadPreference readPreference) {
        CoreKvParamValidators.validateGetAnyReplicaParams(common, key);
        RequestSpan getAnySpan = this.span(common, "get_any_replica");
        return this.getAllReplicasReactive(common.withParentSpan(getAnySpan), key, readPreference).next().doFinally(signalType -> getAnySpan.end());
    }

    @Override
    public Flux<CoreSubdocGetResult> subdocGetAllReplicasReactive(CoreCommonOptions common, String key, List<CoreSubdocGetCommand> commands, CoreReadPreference readPreference) {
        CoreKvParamValidators.validateSubdocGetAllParams(common, key, commands);
        Duration timeout = this.timeout(common);
        RetryStrategy retryStrategy = this.retryStrategy(common);
        return ReplicaHelper.lookupInAllReplicasReactive(this.core, this.collectionIdentifier, key, commands, timeout, retryStrategy, common.clientContext(), common.parentSpan().orElse(null), readPreference);
    }

    @Override
    public Mono<CoreSubdocGetResult> subdocGetAnyReplicaReactive(CoreCommonOptions common, String key, List<CoreSubdocGetCommand> commands, CoreReadPreference readPreference) {
        CoreKvParamValidators.validateSubdocGetAnyParams(common, key, commands);
        RequestSpan getAnySpan = this.span(common, "get_any_replica");
        return this.subdocGetAllReplicasReactive(common.withParentSpan(getAnySpan), key, commands, readPreference).next().switchIfEmpty(Mono.error((Throwable)new DocumentUnretrievableException(ReducedKeyValueErrorContext.create(key, this.collectionIdentifier)))).doFinally(signalType -> getAnySpan.end());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CoreAsyncResponse<CoreSubdocMutateResult> subdocMutateAsync(CoreCommonOptions common, String key, Supplier<List<CoreSubdocMutateCommand>> commands, CoreStoreSemantics storeSemantics, long cas, CoreDurability durability, CoreExpiry expiry, boolean preserveExpiry, boolean accessDeleted, boolean createAsDeleted) {
        List<CoreSubdocMutateCommand> encodedCommands;
        CoreKvParamValidators.validateSubdocMutateParams(common, key, storeSemantics, cas);
        Duration timeout = this.timeout(common, durability);
        RetryStrategy retryStrategy = this.retryStrategy(common);
        RequestSpan span = this.span(common, "mutate_in");
        RequestSpan encodeSpan = this.span(span, "request_encoding");
        long encodingStartNanos = System.nanoTime();
        try {
            encodedCommands = commands.get();
        }
        finally {
            encodeSpan.end();
        }
        long encodingEndNanos = System.nanoTime();
        if (encodedCommands.isEmpty()) {
            throw SubdocMutateRequest.errIfNoCommands(ReducedKeyValueErrorContext.create(key, this.collectionIdentifier));
        }
        if (encodedCommands.size() > 16) {
            throw SubdocMutateRequest.errIfTooManyCommands(ReducedKeyValueErrorContext.create(key, this.collectionIdentifier));
        }
        boolean needsBucketConfig = createAsDeleted || storeSemantics == CoreStoreSemantics.REVIVE;
        CompletableFuture bucketConfigFuture = needsBucketConfig ? BucketConfigUtil.waitForBucketConfig(this.core, this.keyspace.bucket(), timeout).toFuture() : CompletableFuture.completedFuture(null);
        AtomicReference requestHolder = new AtomicReference();
        CompletionStage finalResultFuture = bucketConfigFuture.thenCompose(bucketConfig -> {
            SubdocMutateRequest request = new SubdocMutateRequest(timeout, this.ctx, this.collectionIdentifier, (BucketConfig)bucketConfig, retryStrategy, key, storeSemantics, accessDeleted, createAsDeleted, encodedCommands, ClassicExpiryHelper.encode(expiry), preserveExpiry, cas, durability.levelIfSynchronous(), span);
            request.context().clientContext(common.clientContext()).encodeLatency(encodingEndNanos - encodingStartNanos);
            requestHolder.set(request);
            CompletableFuture<CoreSubdocMutateResult> subdocRequestFuture = this.executeWithoutMarkingComplete(request, (req, res) -> {
                throw res.throwError(request, storeSemantics == CoreStoreSemantics.INSERT);
            }, it -> new CoreSubdocMutateResult(this.keyspace, key, CoreKvResponseMetadata.from(it.flexibleExtras()), it.cas(), it.mutationToken(), Arrays.asList(it.values())));
            return ClassicHelper.maybeWrapWithLegacyDurability(subdocRequestFuture, key, durability, this.core, request).whenComplete((response, failure) -> ClassicCoreKvOps.markComplete(request, failure));
        });
        return new CoreAsyncResponse<CoreSubdocMutateResult>((CompletableFuture<CoreSubdocMutateResult>)finalResultFuture, () -> Optional.ofNullable((SubdocMutateRequest)requestHolder.get()).ifPresent(it -> it.cancel(CancellationReason.STOPPED_LISTENING)));
    }

    @Override
    public Flux<CoreRangeScanItem> scanRequestReactive(CoreScanType scanType, CoreScanOptions options) {
        Flux<CoreRangeScanItem> coreScanStream;
        if (scanType instanceof CoreRangeScan) {
            coreScanStream = this.rangeScanOrchestrator.rangeScan((CoreRangeScan)scanType, options);
        } else if (scanType instanceof CoreSamplingScan) {
            coreScanStream = this.rangeScanOrchestrator.samplingScan((CoreSamplingScan)scanType, options);
        } else {
            return Flux.error((Throwable)InvalidArgumentException.fromMessage("Unsupported ScanType: " + scanType));
        }
        if (options.idsOnly()) {
            return coreScanStream.map(item -> CoreRangeScanItem.keyOnly(item.keyBytes()));
        }
        return coreScanStream.map(item -> CoreRangeScanItem.keyAndBody(item.flags(), item.expiry(), item.seqno(), item.cas(), item.keyBytes(), item.value()));
    }

    private <T extends BaseResponse, R> CompletableFuture<R> execute(KeyValueRequest<T> request, Function<T, R> responseTransformer) {
        return this.execute(request, ClassicCoreKvOps::commonKvResponseCheck, responseTransformer);
    }

    private <T extends BaseResponse, R> CompletableFuture<R> execute(KeyValueRequest<T> request, BiConsumer<KeyValueRequest<T>, T> responseChecker, Function<T, R> responseTransformer) {
        return this.executeWithoutMarkingComplete(request, responseChecker, responseTransformer).whenComplete((response, failure) -> ClassicCoreKvOps.markComplete(request, failure));
    }

    private <T extends BaseResponse, R> CompletableFuture<R> executeWithoutMarkingComplete(KeyValueRequest<T> request, Function<T, R> responseTransformer) {
        return this.executeWithoutMarkingComplete(request, ClassicCoreKvOps::commonKvResponseCheck, responseTransformer);
    }

    private <T extends BaseResponse, R> CompletableFuture<R> executeWithoutMarkingComplete(KeyValueRequest<T> request, BiConsumer<KeyValueRequest<T>, T> responseChecker, Function<T, R> responseTransformer) {
        this.core.send(request);
        return request.response().thenApply(response -> {
            if (!response.status().success()) {
                responseChecker.accept(request, response);
            }
            return responseTransformer.apply(response);
        });
    }

    private static <T extends BaseResponse> void commonKvResponseCheck(KeyValueRequest<T> request, T response) {
        throw DefaultErrorUtil.keyValueStatusToException(request, response);
    }

    private <T extends BaseResponse, R> CoreAsyncResponse<R> newAsyncResponse(KeyValueRequest<T> request, Function<T, R> responseTransformer) {
        return this.newAsyncResponse(request, ClassicCoreKvOps::commonKvResponseCheck, responseTransformer);
    }

    private <T extends BaseResponse, R> CoreAsyncResponse<R> newAsyncResponse(KeyValueRequest<T> request, BiConsumer<KeyValueRequest<T>, T> responseChecker, Function<T, R> responseTransformer) {
        CompletableFuture<R> response = this.execute(request, responseChecker, responseTransformer);
        return ClassicHelper.newAsyncResponse(request, response);
    }

    private static void markComplete(KeyValueRequest<?> request, Throwable failure) {
        if (failure == null || failure instanceof DocumentNotFoundException) {
            request.context().logicallyComplete();
        } else {
            request.context().logicallyComplete(failure);
        }
    }

    private Duration timeout(CoreCommonOptions common) {
        return common.timeout().orElse(this.defaultKvTimeout);
    }

    private Duration timeout(CoreCommonOptions common, CoreDurability durability) {
        return common.timeout().orElse(durability.isPersistent() ? this.defaultKvDurableTimeout : this.defaultKvTimeout);
    }

    private RetryStrategy retryStrategy(CoreCommonOptions common) {
        return common.retryStrategy().orElse(this.defaultRetryStrategy);
    }

    private RequestSpan span(CoreCommonOptions common, String spanName) {
        return this.span((RequestSpan)common.parentSpan().orElse(null), spanName);
    }

    private RequestSpan span(RequestSpan parent, String spanName) {
        return CbTracing.newSpan(this.requestTracer, spanName, parent);
    }
}

