package com.couchbase.client.core.service.kv;

import com.couchbase.client.core.Core;
import com.couchbase.client.core.CoreContext;
import com.couchbase.client.core.CoreKeyspace;
import com.couchbase.client.core.Reactor;
import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.api.kv.CoreKvResponseMetadata;
import com.couchbase.client.core.api.kv.CoreReadPreference;
import com.couchbase.client.core.api.kv.CoreSubdocGetCommand;
import com.couchbase.client.core.api.kv.CoreSubdocGetResult;
import com.couchbase.client.core.cnc.RequestSpan;
import com.couchbase.client.core.cnc.TracingIdentifiers;
import com.couchbase.client.core.cnc.events.request.IndividualReplicaGetFailedEvent;
import com.couchbase.client.core.config.BucketCapabilities;
import com.couchbase.client.core.config.BucketConfig;
import com.couchbase.client.core.config.CouchbaseBucketConfig;
import com.couchbase.client.core.env.CoreEnvironment;
import com.couchbase.client.core.error.CommonExceptions;
import com.couchbase.client.core.error.CouchbaseException;
import com.couchbase.client.core.error.DefaultErrorUtil;
import com.couchbase.client.core.error.DocumentUnretrievableException;
import com.couchbase.client.core.error.FeatureNotAvailableException;
import com.couchbase.client.core.error.context.AggregateErrorContext;
import com.couchbase.client.core.error.context.ErrorContext;
import com.couchbase.client.core.error.context.ReducedKeyValueErrorContext;
import com.couchbase.client.core.io.CollectionIdentifier;
import com.couchbase.client.core.msg.kv.GetRequest;
import com.couchbase.client.core.msg.kv.GetResponse;
import com.couchbase.client.core.msg.kv.ReplicaGetRequest;
import com.couchbase.client.core.msg.kv.ReplicaSubdocGetRequest;
import com.couchbase.client.core.msg.kv.SubdocGetRequest;
import com.couchbase.client.core.msg.kv.SubdocGetResponse;
import com.couchbase.client.core.retry.RetryStrategy;
import com.couchbase.client.core.util.Validators;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Stability.Internal
/* loaded from: input_file:com/couchbase/client/core/service/kv/ReplicaHelper.class */
public class ReplicaHelper {

    @Deprecated
    /* loaded from: input_file:com/couchbase/client/core/service/kv/ReplicaHelper$GetReplicaResponse.class */
    public static class GetReplicaResponse {
        private final GetResponse response;
        private final boolean fromReplica;

        public GetReplicaResponse(GetResponse getResponse, boolean z) {
            this.response = (GetResponse) Objects.requireNonNull(getResponse);
            this.fromReplica = z;
        }

        public boolean isFromReplica() {
            return this.fromReplica;
        }

        public GetResponse getResponse() {
            return this.response;
        }
    }

    private ReplicaHelper() {
        throw new AssertionError("not instantiable");
    }

    public static Flux<GetReplicaResponse> getAllReplicasReactive(Core core, CollectionIdentifier collectionIdentifier, String str, Duration duration, RetryStrategy retryStrategy, Map<String, Object> map, RequestSpan requestSpan, CoreReadPreference coreReadPreference) {
        Validators.notNullOrEmpty(str, "Id", (Supplier<ErrorContext>) () -> {
            return ReducedKeyValueErrorContext.create(str, collectionIdentifier);
        });
        RequestSpan requestSpan2 = core.context().coreResources().requestTracer().requestSpan(TracingIdentifiers.SPAN_GET_ALL_REPLICAS, requestSpan);
        return Reactor.toMono(() -> {
            return getAllReplicasRequests(core, collectionIdentifier, str, map, retryStrategy, duration, requestSpan2, coreReadPreference);
        }).flux().flatMap(Flux::fromStream).flatMap(getRequest -> {
            return Reactor.wrap(getRequest, get(core, getRequest), true).onErrorResume(th -> {
                core.environment().eventBus().publish(new IndividualReplicaGetFailedEvent(getRequest.context()));
                return Mono.empty();
            }).map(getResponse -> {
                return new GetReplicaResponse(getResponse, getRequest instanceof ReplicaGetRequest);
            });
        }).doFinally(signalType -> {
            requestSpan2.end();
        });
    }

    public static Flux<CoreSubdocGetResult> lookupInAllReplicasReactive(Core core, CollectionIdentifier collectionIdentifier, String str, List<CoreSubdocGetCommand> list, Duration duration, RetryStrategy retryStrategy, Map<String, Object> map, RequestSpan requestSpan, CoreReadPreference coreReadPreference) {
        Validators.notNullOrEmpty(str, "Id", (Supplier<ErrorContext>) () -> {
            return ReducedKeyValueErrorContext.create(str, collectionIdentifier);
        });
        CoreEnvironment environment = core.context().environment();
        RequestSpan requestSpan2 = core.context().coreResources().requestTracer().requestSpan(TracingIdentifiers.SPAN_LOOKUP_IN_ALL_REPLICAS, requestSpan);
        return Reactor.toMono(() -> {
            return lookupInAllReplicasRequests(core, collectionIdentifier, str, list, map, retryStrategy, duration, requestSpan2, coreReadPreference);
        }).flux().flatMap(Flux::fromStream).flatMap(subdocGetRequest -> {
            return Reactor.wrap(subdocGetRequest, get(core, subdocGetRequest), true).onErrorResume(th -> {
                environment.eventBus().publish(new IndividualReplicaGetFailedEvent(subdocGetRequest.context()));
                return Mono.empty();
            }).map(subdocGetResponse -> {
                return new CoreSubdocGetResult(CoreKeyspace.from(collectionIdentifier), str, CoreKvResponseMetadata.from(subdocGetResponse.flexibleExtras()), Arrays.asList(subdocGetResponse.values()), subdocGetResponse.cas(), subdocGetResponse.isDeleted(), subdocGetRequest instanceof ReplicaSubdocGetRequest);
            });
        }).doFinally(signalType -> {
            requestSpan2.end();
        });
    }

    public static <R> CompletableFuture<List<CompletableFuture<R>>> getAllReplicasAsync(Core core, CollectionIdentifier collectionIdentifier, String str, Duration duration, RetryStrategy retryStrategy, Map<String, Object> map, RequestSpan requestSpan, CoreReadPreference coreReadPreference, Function<GetReplicaResponse, R> function) {
        RequestSpan requestSpan2 = core.context().coreResources().requestTracer().requestSpan(TracingIdentifiers.SPAN_LOOKUP_IN_ALL_REPLICAS, requestSpan);
        return getAllReplicasRequests(core, collectionIdentifier, str, map, retryStrategy, duration, requestSpan2, coreReadPreference).thenApply(stream -> {
            return (List) stream.map(getRequest -> {
                return get(core, getRequest).thenApply(getResponse -> {
                    return new GetReplicaResponse(getResponse, getRequest instanceof ReplicaGetRequest);
                }).thenApply((Function<? super U, ? extends U>) function);
            }).collect(Collectors.toList());
        }).whenComplete((BiConsumer<? super U, ? super Throwable>) (list, th) -> {
            AtomicInteger atomicInteger = new AtomicInteger(list.size());
            Iterator it = list.iterator();
            while (it.hasNext()) {
                ((CompletableFuture) it.next()).whenComplete((obj, th) -> {
                    if (atomicInteger.decrementAndGet() == 0) {
                        requestSpan2.end();
                    }
                });
            }
        });
    }

    public static <R> CompletableFuture<List<CompletableFuture<R>>> lookupInAllReplicasAsync(Core core, CollectionIdentifier collectionIdentifier, String str, List<CoreSubdocGetCommand> list, Duration duration, RetryStrategy retryStrategy, Map<String, Object> map, RequestSpan requestSpan, CoreReadPreference coreReadPreference, Function<CoreSubdocGetResult, R> function) {
        RequestSpan requestSpan2 = core.context().coreResources().requestTracer().requestSpan(TracingIdentifiers.SPAN_GET_ALL_REPLICAS, requestSpan);
        return lookupInAllReplicasRequests(core, collectionIdentifier, str, list, map, retryStrategy, duration, requestSpan2, coreReadPreference).thenApply(stream -> {
            return (List) stream.map(subdocGetRequest -> {
                return get(core, subdocGetRequest).thenApply(subdocGetResponse -> {
                    return new CoreSubdocGetResult(CoreKeyspace.from(collectionIdentifier), str, CoreKvResponseMetadata.from(subdocGetResponse.flexibleExtras()), Arrays.asList(subdocGetResponse.values()), subdocGetResponse.cas(), subdocGetResponse.isDeleted(), subdocGetRequest instanceof ReplicaSubdocGetRequest);
                }).thenApply((Function<? super U, ? extends U>) function);
            }).collect(Collectors.toList());
        }).whenComplete((BiConsumer<? super U, ? super Throwable>) (list2, th) -> {
            AtomicInteger atomicInteger = new AtomicInteger(list2.size());
            Iterator it = list2.iterator();
            while (it.hasNext()) {
                ((CompletableFuture) it.next()).whenComplete((obj, th) -> {
                    if (atomicInteger.decrementAndGet() == 0) {
                        requestSpan2.end();
                    }
                });
            }
        });
    }

    public static <R> CompletableFuture<R> getAnyReplicaAsync(Core core, CollectionIdentifier collectionIdentifier, String str, Duration duration, RetryStrategy retryStrategy, Map<String, Object> map, RequestSpan requestSpan, CoreReadPreference coreReadPreference, Function<GetReplicaResponse, R> function) {
        RequestSpan requestSpan2 = core.context().coreResources().requestTracer().requestSpan(TracingIdentifiers.SPAN_GET_ANY_REPLICA, requestSpan);
        return aggregate(getAllReplicasAsync(core, collectionIdentifier, str, duration, retryStrategy, map, requestSpan2, coreReadPreference, function), function).whenComplete((obj, th) -> {
            requestSpan2.end();
        });
    }

    public static <R> CompletableFuture<R> lookupInAnyReplicaAsync(Core core, CollectionIdentifier collectionIdentifier, String str, List<CoreSubdocGetCommand> list, Duration duration, RetryStrategy retryStrategy, Map<String, Object> map, RequestSpan requestSpan, CoreReadPreference coreReadPreference, Function<CoreSubdocGetResult, R> function) {
        RequestSpan requestSpan2 = core.context().coreResources().requestTracer().requestSpan(TracingIdentifiers.SPAN_LOOKUP_IN_ANY_REPLICA, requestSpan);
        return aggregate(lookupInAllReplicasAsync(core, collectionIdentifier, str, list, duration, retryStrategy, map, requestSpan2, coreReadPreference, function), function).whenComplete((obj, th) -> {
            requestSpan2.end();
        });
    }

    private static <R> CompletableFuture<R> aggregate(CompletableFuture<List<CompletableFuture<R>>> completableFuture, Function<?, R> function) {
        CompletableFuture<R> completableFuture2 = new CompletableFuture<>();
        completableFuture.whenComplete((list, th) -> {
            if (th != null) {
                completableFuture2.completeExceptionally(th);
            }
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            AtomicInteger atomicInteger = new AtomicInteger(0);
            List synchronizedList = Collections.synchronizedList(new ArrayList());
            list.forEach(completableFuture3 -> {
                completableFuture3.whenComplete((obj, th) -> {
                    try {
                        int incrementAndGet = atomicInteger.incrementAndGet();
                        if (th != null && (th instanceof CompletionException) && (th.getCause() instanceof CouchbaseException)) {
                            synchronizedList.add(((CouchbaseException) th.getCause()).context());
                        }
                        if (obj != null && atomicBoolean.compareAndSet(false, true)) {
                            completableFuture2.complete(obj);
                        }
                        if (!atomicBoolean.get() && incrementAndGet == list.size()) {
                            completableFuture2.completeExceptionally(new DocumentUnretrievableException(new AggregateErrorContext(synchronizedList)));
                        }
                    } catch (Throwable th) {
                        completableFuture2.completeExceptionally(new RuntimeException(th.getClass().toString()));
                    }
                });
            });
        });
        return completableFuture2;
    }

    public static CompletableFuture<Stream<GetRequest>> getAllReplicasRequests(Core core, CollectionIdentifier collectionIdentifier, String str, Map<String, Object> map, RetryStrategy retryStrategy, Duration duration, RequestSpan requestSpan, CoreReadPreference coreReadPreference) {
        Validators.notNullOrEmpty(str, "Id");
        CoreContext context = core.context();
        context.environment();
        BucketConfig bucketConfig = core.clusterConfig().bucketConfig(collectionIdentifier.bucket());
        if (!(bucketConfig instanceof CouchbaseBucketConfig)) {
            if (bucketConfig != null) {
                return failedFuture(CommonExceptions.getFromReplicaNotCouchbaseBucket());
            }
            Duration ofMillis = Duration.ofMillis(100L);
            CompletableFuture<Stream<GetRequest>> completableFuture = new CompletableFuture<>();
            context.environment().timer().schedule(() -> {
                getAllReplicasRequests(core, collectionIdentifier, str, map, retryStrategy, duration.minus(ofMillis), requestSpan, coreReadPreference).whenComplete((stream, th) -> {
                    if (th != null) {
                        completableFuture.completeExceptionally(th);
                    } else {
                        completableFuture.complete(stream);
                    }
                });
            }, ofMillis);
            return completableFuture;
        }
        CouchbaseBucketConfig couchbaseBucketConfig = (CouchbaseBucketConfig) bucketConfig;
        int numberOfReplicas = couchbaseBucketConfig.numberOfReplicas();
        ArrayList arrayList = new ArrayList(numberOfReplicas + 1);
        NodeIndexCalculator nodeIndexCalculator = new NodeIndexCalculator(coreReadPreference, couchbaseBucketConfig, context);
        if (nodeIndexCalculator.canUseNodeForActive(str)) {
            GetRequest getRequest = new GetRequest(str, duration, context, collectionIdentifier, retryStrategy, context.coreResources().requestTracer().requestSpan("get", requestSpan));
            getRequest.context().clientContext(map);
            arrayList.add(getRequest);
        }
        short s = 1;
        while (true) {
            short s2 = s;
            if (s2 > numberOfReplicas) {
                break;
            }
            if (nodeIndexCalculator.canUseNodeForReplica(str, s2 - 1)) {
                ReplicaGetRequest replicaGetRequest = new ReplicaGetRequest(str, duration, context, collectionIdentifier, retryStrategy, s2, context.coreResources().requestTracer().requestSpan(TracingIdentifiers.SPAN_REQUEST_KV_GET_REPLICA, requestSpan));
                replicaGetRequest.context().clientContext(map);
                arrayList.add(replicaGetRequest);
            }
            s = (short) (s2 + 1);
        }
        return arrayList.isEmpty() ? failedFuture(DocumentUnretrievableException.noReplicasSuitable()) : CompletableFuture.completedFuture(arrayList.stream());
    }

    public static CompletableFuture<Stream<SubdocGetRequest>> lookupInAllReplicasRequests(Core core, CollectionIdentifier collectionIdentifier, String str, List<CoreSubdocGetCommand> list, Map<String, Object> map, RetryStrategy retryStrategy, Duration duration, RequestSpan requestSpan, CoreReadPreference coreReadPreference) {
        Validators.notNullOrEmpty(str, "Id");
        CoreContext context = core.context();
        context.environment();
        BucketConfig bucketConfig = core.clusterConfig().bucketConfig(collectionIdentifier.bucket());
        if (!(bucketConfig instanceof CouchbaseBucketConfig)) {
            if (bucketConfig != null) {
                return failedFuture(CommonExceptions.getFromReplicaNotCouchbaseBucket());
            }
            Duration ofMillis = Duration.ofMillis(100L);
            CompletableFuture<Stream<SubdocGetRequest>> completableFuture = new CompletableFuture<>();
            context.environment().timer().schedule(() -> {
                lookupInAllReplicasRequests(core, collectionIdentifier, str, list, map, retryStrategy, duration.minus(ofMillis), requestSpan, coreReadPreference).whenComplete((stream, th) -> {
                    if (th != null) {
                        completableFuture.completeExceptionally(th);
                    } else {
                        completableFuture.complete(stream);
                    }
                });
            }, ofMillis);
            return completableFuture;
        }
        CouchbaseBucketConfig couchbaseBucketConfig = (CouchbaseBucketConfig) bucketConfig;
        if (!bucketConfig.bucketCapabilities().contains(BucketCapabilities.SUBDOC_READ_REPLICA)) {
            return failedFuture(FeatureNotAvailableException.subdocReadReplica());
        }
        int numberOfReplicas = couchbaseBucketConfig.numberOfReplicas();
        ArrayList arrayList = new ArrayList(numberOfReplicas + 1);
        NodeIndexCalculator nodeIndexCalculator = new NodeIndexCalculator(coreReadPreference, couchbaseBucketConfig, context);
        if (nodeIndexCalculator.canUseNodeForActive(str)) {
            SubdocGetRequest create = SubdocGetRequest.create(duration, context, collectionIdentifier, retryStrategy, str, (byte) 0, list, context.coreResources().requestTracer().requestSpan(TracingIdentifiers.SPAN_REQUEST_KV_LOOKUP_IN, requestSpan));
            create.context().clientContext(map);
            arrayList.add(create);
        }
        short s = 1;
        while (true) {
            short s2 = s;
            if (s2 > numberOfReplicas) {
                break;
            }
            if (nodeIndexCalculator.canUseNodeForReplica(str, s2 - 1)) {
                ReplicaSubdocGetRequest create2 = ReplicaSubdocGetRequest.create(duration, context, collectionIdentifier, retryStrategy, str, (byte) 0, list, s2, context.coreResources().requestTracer().requestSpan(TracingIdentifiers.SPAN_LOOKUP_IN_ALL_REPLICAS, requestSpan));
                create2.context().clientContext(map);
                arrayList.add(create2);
            }
            s = (short) (s2 + 1);
        }
        return arrayList.isEmpty() ? failedFuture(DocumentUnretrievableException.noReplicasSuitable()) : CompletableFuture.completedFuture(arrayList.stream());
    }

    private static <T> CompletableFuture<T> failedFuture(Throwable th) {
        CompletableFuture<T> completableFuture = new CompletableFuture<>();
        completableFuture.completeExceptionally(th);
        return completableFuture;
    }

    private static CompletableFuture<GetResponse> get(Core core, GetRequest getRequest) {
        core.send(getRequest);
        return getRequest.response().thenApply((Function<? super R, ? extends U>) getResponse -> {
            if (getResponse.status().success()) {
                return getResponse;
            }
            throw DefaultErrorUtil.keyValueStatusToException(getRequest, getResponse);
        }).whenComplete((getResponse2, th) -> {
            getRequest.context().logicallyComplete(th);
        });
    }

    private static CompletableFuture<SubdocGetResponse> get(Core core, SubdocGetRequest subdocGetRequest) {
        core.send(subdocGetRequest);
        return subdocGetRequest.response().thenApply((Function<? super R, ? extends U>) subdocGetResponse -> {
            if (subdocGetResponse.status().success()) {
                return subdocGetResponse;
            }
            throw DefaultErrorUtil.keyValueStatusToException(subdocGetRequest, subdocGetResponse);
        }).whenComplete((subdocGetResponse2, th) -> {
            subdocGetRequest.context().logicallyComplete(th);
        });
    }
}
