/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.config.refresher;

import com.couchbase.client.core.Core;
import com.couchbase.client.core.CoreContext;
import com.couchbase.client.core.Reactor;
import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.cnc.EventBus;
import com.couchbase.client.core.cnc.events.config.BucketConfigRefreshFailedEvent;
import com.couchbase.client.core.config.BucketConfig;
import com.couchbase.client.core.config.ConfigRefreshFailure;
import com.couchbase.client.core.config.ConfigVersion;
import com.couchbase.client.core.config.ConfigurationProvider;
import com.couchbase.client.core.config.NodeInfo;
import com.couchbase.client.core.config.ProposedBucketConfigContext;
import com.couchbase.client.core.config.refresher.BucketRefresher;
import com.couchbase.client.core.io.CollectionIdentifier;
import com.couchbase.client.core.msg.kv.CarrierBucketConfigRequest;
import com.couchbase.client.core.retry.FailFastRetryStrategy;
import com.couchbase.client.core.service.ServiceType;
import com.couchbase.client.core.util.NanoTimestamp;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

@Stability.Internal
public class KeyValueBucketRefresher
implements BucketRefresher {
    static final Duration POLLER_INTERVAL = Duration.ofSeconds(1L);
    static final int MAX_PARALLEL_FETCH = 1;
    private final Core core;
    private final Disposable pollRegistration;
    private final ConfigurationProvider provider;
    private final AtomicLong nodeOffset = new AtomicLong(0L);
    private final Map<String, NanoTimestamp> registrations = new ConcurrentHashMap<String, NanoTimestamp>();
    private final Map<String, AtomicInteger> numFailedRefreshes = new ConcurrentHashMap<String, AtomicInteger>();
    private final Set<String> tainted = ConcurrentHashMap.newKeySet();
    private final Duration configPollInterval;
    private final Duration configRequestTimeout;
    private final EventBus eventBus;

    public KeyValueBucketRefresher(ConfigurationProvider provider, Core core) {
        this.core = core;
        this.eventBus = core.context().environment().eventBus();
        this.provider = provider;
        this.configPollInterval = core.context().environment().ioConfig().configPollInterval();
        this.configRequestTimeout = KeyValueBucketRefresher.clampConfigRequestTimeout(this.configPollInterval);
        this.pollRegistration = Flux.merge((Publisher[])new Publisher[]{Flux.interval((Duration)this.pollerInterval(), (Scheduler)core.context().environment().scheduler()), provider.configChangeNotifications()}).onBackpressureDrop().filter(v -> !this.registrations.isEmpty()).flatMap(tick -> Flux.fromIterable(this.registrations.keySet()).flatMap(bucketName -> this.maybeUpdateBucket((String)bucketName, tick == -1L))).subscribe(provider::proposeBucketConfig);
    }

    protected Duration pollerInterval() {
        return POLLER_INTERVAL;
    }

    static Duration clampConfigRequestTimeout(Duration configPollInterval) {
        return KeyValueBucketRefresher.constrainToRange(configPollInterval, Duration.ofSeconds(1L), Duration.ofSeconds(5L));
    }

    private static Duration constrainToRange(Duration d, Duration min, Duration max) {
        if (min.compareTo(max) > 0) {
            throw new IllegalArgumentException("min duration " + min + " must be <= max duration " + max);
        }
        if (d.compareTo(min) < 0) {
            return min;
        }
        if (d.compareTo(max) > 0) {
            return max;
        }
        return d;
    }

    private Mono<ProposedBucketConfigContext> maybeUpdateBucket(String name, boolean triggeredByConfigChangeNotification) {
        boolean allowed;
        NanoTimestamp last = this.registrations.get(name);
        boolean overInterval = last != null && last.hasElapsed(this.configPollInterval);
        boolean bl = allowed = triggeredByConfigChangeNotification || this.tainted.contains(name) || overInterval;
        if (allowed) {
            List<NodeInfo> nodes = this.filterEligibleNodes(name);
            if (this.numFailedRefreshes.get(name).get() >= nodes.size()) {
                this.provider.signalConfigRefreshFailed(ConfigRefreshFailure.ALL_NODES_TRIED_ONCE_WITHOUT_SUCCESS);
                this.numFailedRefreshes.get(name).set(0);
            }
            return this.fetchConfigPerNode(name, (Flux<NodeInfo>)Flux.fromIterable(nodes).take(1L)).next().doOnSuccess(ctx -> this.registrations.replace(name, NanoTimestamp.now()));
        }
        return Mono.empty();
    }

    private List<NodeInfo> filterEligibleNodes(String name) {
        BucketConfig config = this.provider.config().bucketConfig(name);
        if (config == null) {
            this.eventBus.publish(new BucketConfigRefreshFailedEvent(this.core.context(), BucketConfigRefreshFailedEvent.RefresherType.KV, BucketConfigRefreshFailedEvent.Reason.NO_BUCKET_FOUND, Optional.empty()));
            return Collections.emptyList();
        }
        ArrayList<NodeInfo> nodes = new ArrayList<NodeInfo>(config.nodes());
        this.shiftNodeList(nodes);
        return nodes.stream().filter(n -> n.services().containsKey((Object)ServiceType.KV) || n.sslServices().containsKey((Object)ServiceType.KV)).collect(Collectors.toList());
    }

    private Flux<ProposedBucketConfigContext> fetchConfigPerNode(String name, Flux<NodeInfo> nodes) {
        return nodes.flatMap(nodeInfo -> {
            CoreContext ctx = this.core.context();
            CarrierBucketConfigRequest request = new CarrierBucketConfigRequest(this.configRequestTimeout, ctx, new CollectionIdentifier(name, Optional.empty(), Optional.empty()), FailFastRetryStrategy.INSTANCE, nodeInfo.id(), this.currentVersion(name));
            this.core.send(request);
            return Reactor.wrap(request, request.response(), true).filter(response -> {
                if (!response.status().success()) {
                    this.numFailedRefreshes.get(name).incrementAndGet();
                    this.eventBus.publish(new BucketConfigRefreshFailedEvent(this.core.context(), BucketConfigRefreshFailedEvent.RefresherType.KV, BucketConfigRefreshFailedEvent.Reason.INDIVIDUAL_REQUEST_FAILED, Optional.of(response)));
                }
                return response.status().success();
            }).map(response -> new ProposedBucketConfigContext(name, new String(response.content(), StandardCharsets.UTF_8), nodeInfo.hostname())).doOnSuccess(r -> this.numFailedRefreshes.get(name).set(0)).onErrorResume(t -> {
                this.numFailedRefreshes.get(name).incrementAndGet();
                this.eventBus.publish(new BucketConfigRefreshFailedEvent(this.core.context(), BucketConfigRefreshFailedEvent.RefresherType.KV, BucketConfigRefreshFailedEvent.Reason.INDIVIDUAL_REQUEST_FAILED, Optional.of(t)));
                return Mono.empty();
            });
        });
    }

    private ConfigVersion currentVersion(String bucketName) {
        BucketConfig config = this.provider.config().bucketConfig(bucketName);
        return config == null ? ConfigVersion.ZERO : config.version();
    }

    private <T> void shiftNodeList(List<T> nodeList) {
        int shiftBy = (int)(this.nodeOffset.getAndIncrement() % (long)nodeList.size());
        Collections.rotate(nodeList, -shiftBy);
    }

    @Override
    public Mono<Void> register(String name) {
        return Mono.defer(() -> {
            this.registrations.put(name, NanoTimestamp.never());
            this.numFailedRefreshes.put(name, new AtomicInteger(0));
            return Mono.empty();
        });
    }

    @Override
    public Mono<Void> deregister(String name) {
        return Mono.defer(() -> {
            this.registrations.remove(name);
            this.numFailedRefreshes.remove(name);
            return Mono.empty();
        });
    }

    @Override
    public void markTainted(String name) {
        this.tainted.add(name);
    }

    @Override
    public void markUntainted(String name) {
        this.tainted.remove(name);
    }

    @Override
    public Mono<Void> shutdown() {
        return Mono.defer(() -> {
            if (!this.pollRegistration.isDisposed()) {
                this.pollRegistration.dispose();
            }
            this.numFailedRefreshes.clear();
            this.registrations.clear();
            this.tainted.clear();
            return Mono.empty();
        });
    }

    @Override
    public Set<String> registered() {
        return this.registrations.keySet();
    }
}

