/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.config.refresher;

import com.couchbase.client.core.Core;
import com.couchbase.client.core.CoreContext;
import com.couchbase.client.core.Reactor;
import com.couchbase.client.core.cnc.events.config.IndividualGlobalConfigRefreshFailedEvent;
import com.couchbase.client.core.config.ConfigRefreshFailure;
import com.couchbase.client.core.config.ConfigVersion;
import com.couchbase.client.core.config.ConfigurationProvider;
import com.couchbase.client.core.config.GlobalConfig;
import com.couchbase.client.core.config.PortInfo;
import com.couchbase.client.core.config.ProposedGlobalConfigContext;
import com.couchbase.client.core.config.refresher.KeyValueBucketRefresher;
import com.couchbase.client.core.msg.kv.CarrierGlobalConfigRequest;
import com.couchbase.client.core.msg.kv.CarrierGlobalConfigResponse;
import com.couchbase.client.core.retry.FailFastRetryStrategy;
import com.couchbase.client.core.retry.RetryStrategy;
import com.couchbase.client.core.service.ServiceType;
import com.couchbase.client.core.util.NanoTimestamp;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

public class GlobalRefresher {
    private final ConfigurationProvider provider;
    private final Core core;
    private final Duration configPollInterval;
    private final Duration configRequestTimeout;
    private final Disposable pollRegistration;
    private final AtomicLong nodeOffset = new AtomicLong(0L);
    private volatile boolean started = false;
    private volatile NanoTimestamp lastPoll = NanoTimestamp.never();
    private final AtomicInteger numFailedRefreshes = new AtomicInteger(0);

    public GlobalRefresher(ConfigurationProvider provider, Core core) {
        this.provider = provider;
        this.core = core;
        this.configPollInterval = core.context().environment().ioConfig().configPollInterval();
        this.configRequestTimeout = KeyValueBucketRefresher.clampConfigRequestTimeout(this.configPollInterval);
        this.pollRegistration = Flux.merge((Publisher[])new Publisher[]{Flux.interval((Duration)this.pollerInterval(), (Scheduler)core.context().environment().scheduler()), provider.configChangeNotifications()}).onBackpressureDrop().filter(v -> this.started).filter(v -> v == -1L || this.lastPoll.hasElapsed(this.configPollInterval)).flatMap(ign -> {
            List<PortInfo> nodes = this.filterEligibleNodes();
            if (this.numFailedRefreshes.get() >= nodes.size()) {
                provider.signalConfigRefreshFailed(ConfigRefreshFailure.ALL_NODES_TRIED_ONCE_WITHOUT_SUCCESS);
                this.numFailedRefreshes.set(0);
            }
            return this.attemptUpdateGlobalConfig((Flux<PortInfo>)Flux.fromIterable(nodes).take(1L));
        }).subscribe(provider::proposeGlobalConfig);
    }

    protected Duration pollerInterval() {
        return KeyValueBucketRefresher.POLLER_INTERVAL;
    }

    private Flux<ProposedGlobalConfigContext> attemptUpdateGlobalConfig(Flux<PortInfo> nodes) {
        return nodes.flatMap(nodeInfo -> {
            NanoTimestamp start = NanoTimestamp.now();
            CoreContext ctx = this.core.context();
            CarrierGlobalConfigRequest request = new CarrierGlobalConfigRequest(this.configRequestTimeout, ctx, (RetryStrategy)FailFastRetryStrategy.INSTANCE, nodeInfo.identifier(), this.currentVersion());
            this.core.send(request);
            return Reactor.wrap(request, request.response(), true).filter(response -> {
                if (response.status().success()) {
                    return true;
                }
                this.numFailedRefreshes.incrementAndGet();
                this.core.context().environment().eventBus().publish(new IndividualGlobalConfigRefreshFailedEvent(start.elapsed(), this.core.context(), null, nodeInfo.hostname(), (CarrierGlobalConfigResponse)response));
                return false;
            }).map(response -> new ProposedGlobalConfigContext(new String(response.content(), StandardCharsets.UTF_8), nodeInfo.hostname())).doOnSuccess(r -> {
                this.numFailedRefreshes.set(0);
                this.lastPoll = NanoTimestamp.now();
            }).onErrorResume(t -> {
                this.numFailedRefreshes.incrementAndGet();
                this.core.context().environment().eventBus().publish(new IndividualGlobalConfigRefreshFailedEvent(start.elapsed(), this.core.context(), (Throwable)t, nodeInfo.hostname(), null));
                return Mono.empty();
            });
        });
    }

    private ConfigVersion currentVersion() {
        GlobalConfig config = this.provider.config().globalConfig();
        return config == null ? ConfigVersion.ZERO : config.version();
    }

    private List<PortInfo> filterEligibleNodes() {
        GlobalConfig config = this.provider.config().globalConfig();
        if (config == null) {
            return Collections.emptyList();
        }
        ArrayList<PortInfo> nodes = new ArrayList<PortInfo>(config.portInfos());
        this.shiftNodeList(nodes);
        return nodes.stream().filter(n -> n.ports().containsKey((Object)ServiceType.KV) || n.sslPorts().containsKey((Object)ServiceType.KV)).collect(Collectors.toList());
    }

    private <T> void shiftNodeList(List<T> nodeList) {
        int shiftBy = (int)(this.nodeOffset.getAndIncrement() % (long)nodeList.size());
        Collections.rotate(nodeList, -shiftBy);
    }

    public Mono<Void> start() {
        return Mono.defer(() -> {
            this.started = true;
            this.numFailedRefreshes.set(0);
            return Mono.empty();
        });
    }

    public Mono<Void> stop() {
        return Mono.defer(() -> {
            this.started = false;
            this.numFailedRefreshes.set(0);
            return Mono.empty();
        });
    }

    public Mono<Void> shutdown() {
        return this.stop().then(Mono.defer(() -> {
            if (!this.pollRegistration.isDisposed()) {
                this.pollRegistration.dispose();
            }
            return Mono.empty();
        }));
    }
}

