/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.diagnostics;

import com.couchbase.client.core.Core;
import com.couchbase.client.core.Reactor;
import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.cnc.events.core.WaitUntilReadyCompletedEvent;
import com.couchbase.client.core.deps.com.fasterxml.jackson.databind.node.ArrayNode;
import com.couchbase.client.core.deps.com.fasterxml.jackson.databind.node.ObjectNode;
import com.couchbase.client.core.deps.io.netty.handler.codec.http.DefaultFullHttpRequest;
import com.couchbase.client.core.deps.io.netty.handler.codec.http.HttpMethod;
import com.couchbase.client.core.deps.io.netty.handler.codec.http.HttpVersion;
import com.couchbase.client.core.diagnostics.ClusterState;
import com.couchbase.client.core.diagnostics.DiagnosticsResult;
import com.couchbase.client.core.diagnostics.EndpointDiagnostics;
import com.couchbase.client.core.diagnostics.HealthPinger;
import com.couchbase.client.core.diagnostics.PingResult;
import com.couchbase.client.core.diagnostics.WaitUntilReadyContext;
import com.couchbase.client.core.error.UnambiguousTimeoutException;
import com.couchbase.client.core.error.context.CancellationErrorContext;
import com.couchbase.client.core.json.Mapper;
import com.couchbase.client.core.msg.RequestTarget;
import com.couchbase.client.core.msg.ResponseStatus;
import com.couchbase.client.core.msg.manager.GenericManagerRequest;
import com.couchbase.client.core.service.ServiceType;
import com.couchbase.client.core.util.CbCollections;
import com.couchbase.client.core.util.Deadline;
import com.couchbase.client.core.util.NanoTimestamp;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

@Stability.Internal
public class WaitUntilReadyHelper {
    @Stability.Internal
    public static CompletableFuture<Void> waitUntilReadyProtostellar(Core core, Duration timeout, ClusterState desiredState) {
        Deadline deadline = Deadline.of(timeout);
        ArrayList cfs = new ArrayList();
        core.protostellar().pool().endpoints().forEach(endpoint -> cfs.add(endpoint.waitUntilReady(deadline, desiredState != ClusterState.OFFLINE)));
        if (desiredState == ClusterState.DEGRADED) {
            return CompletableFuture.anyOf(cfs.toArray(new CompletableFuture[0])).thenRun(() -> {});
        }
        return CompletableFuture.allOf(cfs.toArray(new CompletableFuture[0]));
    }

    @Stability.Internal
    public static CompletableFuture<Void> waitUntilReady(Core core, Set<ServiceType> serviceTypes, Duration timeout, ClusterState desiredState, Optional<String> bucketName) {
        if (core.isProtostellar()) {
            return WaitUntilReadyHelper.waitUntilReadyProtostellar(core, timeout, desiredState);
        }
        WaitUntilReadyState state = new WaitUntilReadyState();
        state.transition(WaitUntilReadyStage.CONFIG_LOAD);
        return Flux.interval((Duration)Duration.ofMillis(10L), (Scheduler)core.context().environment().scheduler()).onBackpressureDrop().filter(i -> !(core.configurationProvider().bucketConfigLoadInProgress() || core.configurationProvider().globalConfigLoadInProgress() || bucketName.isPresent() && core.configurationProvider().collectionRefreshInProgress() || bucketName.isPresent() && core.clusterConfig().bucketConfig((String)bucketName.get()) == null)).flatMap(i -> {
            if (bucketName.isPresent()) {
                state.transition(WaitUntilReadyStage.BUCKET_NODES_HEALTHY);
                GenericManagerRequest request = new GenericManagerRequest(core.context(), () -> new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/pools/default/buckets/" + (String)bucketName.get()), true, null);
                core.send(request);
                return Reactor.wrap(request, request.response(), true).filter(response -> {
                    if (response.status() != ResponseStatus.SUCCESS) {
                        return false;
                    }
                    ObjectNode root = (ObjectNode)Mapper.decodeIntoTree(response.content());
                    ArrayNode nodes = (ArrayNode)root.get("nodes");
                    long healthy = StreamSupport.stream(nodes.spliterator(), false).filter(node -> node.get("status").asText().equals("healthy")).count();
                    return (long)nodes.size() == healthy;
                }).map(ignored -> i);
            }
            return Flux.just((Object)i);
        }).take(1L).flatMap(aLong -> {
            if (!bucketName.isPresent() && !core.clusterConfig().hasClusterOrBucketConfig()) {
                state.transition(WaitUntilReadyStage.COMPLETE);
                WaitUntilReadyContext waitUntilReadyContext = new WaitUntilReadyContext(WaitUntilReadyHelper.servicesToCheck(core, serviceTypes, bucketName), timeout, desiredState, bucketName, core.diagnostics().collect(Collectors.groupingBy(EndpointDiagnostics::type)), state);
                core.context().environment().eventBus().publish(new WaitUntilReadyCompletedEvent(waitUntilReadyContext, WaitUntilReadyCompletedEvent.Reason.CLUSTER_LEVEL_NOT_SUPPORTED));
                return Flux.empty();
            }
            state.transition(WaitUntilReadyStage.PING);
            Flux diagnostics = Flux.interval((Duration)Duration.ofMillis(10L), (Scheduler)core.context().environment().scheduler()).onBackpressureDrop().map(i -> WaitUntilReadyHelper.diagnosticsCurrentState(core)).takeUntil(s -> s == desiredState);
            return Flux.concat((Publisher[])new Publisher[]{WaitUntilReadyHelper.ping(core, WaitUntilReadyHelper.servicesToCheck(core, serviceTypes, bucketName), timeout, bucketName), diagnostics});
        }).then().timeout(timeout, Mono.defer(() -> {
            WaitUntilReadyContext waitUntilReadyContext = new WaitUntilReadyContext(WaitUntilReadyHelper.servicesToCheck(core, serviceTypes, bucketName), timeout, desiredState, bucketName, core.diagnostics().collect(Collectors.groupingBy(EndpointDiagnostics::type)), state);
            CancellationErrorContext errorContext = new CancellationErrorContext(waitUntilReadyContext);
            return Mono.error((Throwable)new UnambiguousTimeoutException("WaitUntilReady timed out", errorContext));
        }), core.context().environment().scheduler()).doOnSuccess(unused -> {
            state.transition(WaitUntilReadyStage.COMPLETE);
            WaitUntilReadyContext waitUntilReadyContext = new WaitUntilReadyContext(WaitUntilReadyHelper.servicesToCheck(core, serviceTypes, bucketName), timeout, desiredState, bucketName, core.diagnostics().collect(Collectors.groupingBy(EndpointDiagnostics::type)), state);
            core.context().environment().eventBus().publish(new WaitUntilReadyCompletedEvent(waitUntilReadyContext, WaitUntilReadyCompletedEvent.Reason.SUCCESS));
        }).toFuture();
    }

    private static Set<ServiceType> servicesToCheck(Core core, Set<ServiceType> serviceTypes, Optional<String> bucketName) {
        return !CbCollections.isNullOrEmpty(serviceTypes) ? serviceTypes : HealthPinger.extractPingTargets(core.clusterConfig(), bucketName).stream().map(RequestTarget::serviceType).collect(Collectors.toSet());
    }

    private static ClusterState diagnosticsCurrentState(Core core) {
        return DiagnosticsResult.aggregateClusterState(core.diagnostics().collect(Collectors.groupingBy(EndpointDiagnostics::type)).values());
    }

    private static Flux<PingResult> ping(Core core, Set<ServiceType> serviceTypes, Duration timeout, Optional<String> bucketName) {
        return HealthPinger.ping(core, Optional.of(timeout), core.context().environment().retryStrategy(), serviceTypes, Optional.empty(), bucketName).flux();
    }

    private static enum WaitUntilReadyStage {
        INITIAL,
        CONFIG_LOAD,
        BUCKET_NODES_HEALTHY,
        PING,
        COMPLETE;

    }

    @Stability.Internal
    public static class WaitUntilReadyState {
        private final Map<WaitUntilReadyStage, Long> timings = new ConcurrentHashMap<WaitUntilReadyStage, Long>();
        private final AtomicLong totalDuration = new AtomicLong();
        private volatile WaitUntilReadyStage currentStage = WaitUntilReadyStage.INITIAL;
        private volatile NanoTimestamp currentStart = NanoTimestamp.now();

        void transition(WaitUntilReadyStage next) {
            long timing = this.currentStart.elapsed().toMillis();
            if (this.currentStage != WaitUntilReadyStage.INITIAL) {
                this.timings.put(this.currentStage, timing);
            }
            this.totalDuration.addAndGet(timing);
            this.currentStage = next;
            this.currentStart = NanoTimestamp.now();
        }

        public Map<String, Object> export() {
            TreeMap<String, Object> toExport = new TreeMap<String, Object>();
            toExport.put("current_stage", (Object)this.currentStage);
            if (this.currentStage != WaitUntilReadyStage.COMPLETE) {
                long currentMs = this.currentStart.elapsed().toMillis();
                toExport.put("current_stage_since_ms", currentMs);
                toExport.put("total_ms", this.totalDuration.get() + currentMs);
            } else {
                toExport.put("total_ms", this.totalDuration.get());
            }
            toExport.put("timings_ms", this.timings);
            return toExport;
        }

        public long totalDuration() {
            return this.totalDuration.get();
        }
    }
}

