/*
 * Decompiled with CFR 0.152.
 */
package org.apache.shenyu.sync.data.consul;

import com.ecwid.consul.v1.ConsulClient;
import com.ecwid.consul.v1.QueryParams;
import com.ecwid.consul.v1.Response;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.shenyu.common.concurrent.ShenyuThreadFactory;
import org.apache.shenyu.common.constant.ConsulConstants;
import org.apache.shenyu.sync.data.api.AuthDataSubscriber;
import org.apache.shenyu.sync.data.api.DiscoveryUpstreamDataSubscriber;
import org.apache.shenyu.sync.data.api.MetaDataSubscriber;
import org.apache.shenyu.sync.data.api.PluginDataSubscriber;
import org.apache.shenyu.sync.data.api.ProxySelectorDataSubscriber;
import org.apache.shenyu.sync.data.api.SyncDataService;
import org.apache.shenyu.sync.data.consul.config.ConsulConfig;
import org.apache.shenyu.sync.data.consul.handler.ConsulCacheHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsulSyncDataService
extends ConsulCacheHandler
implements SyncDataService {
    private static final Logger LOG = LoggerFactory.getLogger(ConsulSyncDataService.class);
    private final Map<String, ConsulCacheHandler.OnChange> groupMap = new HashMap<String, ConsulCacheHandler.OnChange>();
    private final Map<String, Long> consulIndexes = new HashMap<String, Long>();
    private final ScheduledThreadPoolExecutor executor;
    private ScheduledFuture<?> watchFuture;
    private final ConsulConfig consulConfig;
    private final ConsulClient consulClient;
    private final AtomicBoolean running = new AtomicBoolean(false);

    public ConsulSyncDataService(ConsulClient consulClient, ConsulConfig consulConfig, PluginDataSubscriber pluginDataSubscriber, List<MetaDataSubscriber> metaDataSubscribers, List<AuthDataSubscriber> authDataSubscribers, List<ProxySelectorDataSubscriber> proxySelectorDataSubscribers, List<DiscoveryUpstreamDataSubscriber> discoveryUpstreamDataSubscribers) {
        super(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers, proxySelectorDataSubscribers, discoveryUpstreamDataSubscribers);
        this.consulClient = consulClient;
        this.consulConfig = consulConfig;
        this.executor = new ScheduledThreadPoolExecutor(1, ShenyuThreadFactory.create((String)"consul-config-watch", (boolean)true));
        this.consulIndexes.put("shenyu/sync", 0L);
        this.initUpdateMap();
        this.start();
    }

    private void initUpdateMap() {
        this.groupMap.put("shenyu/sync/plugin", this::updatePluginData);
        this.groupMap.put("shenyu/sync/selector", this::updateSelectorMap);
        this.groupMap.put("shenyu/sync/rule", this::updateRuleMap);
        this.groupMap.put("shenyu/sync/meta", this::updateMetaDataMap);
        this.groupMap.put("shenyu/sync/auth", this::updateAuthMap);
        this.groupMap.put("shenyu/sync/proxySelector", this::updateSelectorDataMap);
        this.groupMap.put("shenyu/sync/discoveryUpstream", this::updateDiscoveryUpstreamMap);
    }

    private void watchConfigKeyValues() {
        if (this.running.get()) {
            for (String context : this.consulIndexes.keySet()) {
                try {
                    Response response;
                    Long currentIndex = this.consulIndexes.get(context);
                    if (Objects.isNull(currentIndex)) {
                        currentIndex = ConsulConstants.INIT_CONFIG_VERSION_INDEX;
                    }
                    if (Objects.isNull((response = this.consulClient.getKVValues(context, null, new QueryParams((long)this.consulConfig.getWaitTime(), currentIndex.longValue()))).getValue()) || ((List)response.getValue()).isEmpty()) {
                        if (!LOG.isTraceEnabled()) continue;
                        LOG.trace("No value for context " + context);
                        continue;
                    }
                    Long newIndex = response.getConsulIndex();
                    if (Objects.isNull(newIndex) || Objects.equals(newIndex, currentIndex)) {
                        if (!LOG.isTraceEnabled()) continue;
                        LOG.trace("Same index for context " + context);
                        continue;
                    }
                    if (!this.consulIndexes.containsValue(newIndex) && !currentIndex.equals(ConsulConstants.INIT_CONFIG_VERSION_INDEX)) {
                        if (LOG.isTraceEnabled()) {
                            LOG.trace("Context " + context + " has new index " + newIndex);
                        }
                        Long lastIndex = currentIndex;
                        ((List)response.getValue()).forEach(data -> {
                            if (data.getModifyIndex() == lastIndex.longValue()) {
                                return;
                            }
                            this.groupMap.get(data.getKey()).change(data.getDecodedValue());
                        });
                    } else if (LOG.isTraceEnabled()) {
                        LOG.info("Event for index already published for context " + context);
                    }
                    this.consulIndexes.put(context, newIndex);
                }
                catch (Exception e) {
                    LOG.warn("Error querying consul Key/Values for context '" + context + "'. Message: " + e.getMessage());
                }
            }
        }
    }

    public void start() {
        if (this.running.compareAndSet(false, true)) {
            this.watchFuture = this.executor.scheduleWithFixedDelay(this::watchConfigKeyValues, 5L, this.consulConfig.getWatchDelay(), TimeUnit.MILLISECONDS);
        }
    }

    public void close() {
        if (this.running.compareAndSet(true, false) && Objects.nonNull(this.watchFuture)) {
            this.watchFuture.cancel(true);
        }
    }
}

