/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dubbo.registry.xds.util;

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.ConcurrentHashSet;
import org.apache.dubbo.registry.xds.util.AdsObserver;
import org.apache.dubbo.registry.xds.util.NodeBuilder;
import org.apache.dubbo.registry.xds.util.XdsChannel;
import org.apache.dubbo.registry.xds.util.protocol.impl.EdsProtocol;
import org.apache.dubbo.registry.xds.util.protocol.impl.LdsProtocol;
import org.apache.dubbo.registry.xds.util.protocol.impl.RdsProtocol;
import org.apache.dubbo.registry.xds.util.protocol.message.Endpoint;
import org.apache.dubbo.registry.xds.util.protocol.message.EndpointResult;
import org.apache.dubbo.registry.xds.util.protocol.message.ListenerResult;
import org.apache.dubbo.registry.xds.util.protocol.message.RouteResult;
import org.apache.dubbo.rpc.cluster.router.xds.RdsVirtualHostListener;
import org.apache.dubbo.rpc.model.ApplicationModel;

public class PilotExchanger {
    protected final XdsChannel xdsChannel;
    protected final LdsProtocol ldsProtocol;
    protected final RdsProtocol rdsProtocol;
    protected final EdsProtocol edsProtocol;
    protected Map<String, ListenerResult> listenerResult;
    protected Map<String, RouteResult> routeResult;
    private final AtomicBoolean isRdsObserve = new AtomicBoolean(false);
    private final Set<String> domainObserveRequest = new ConcurrentHashSet();
    private final Map<String, Set<Consumer<Set<Endpoint>>>> domainObserveConsumer = new ConcurrentHashMap<String, Set<Consumer<Set<Endpoint>>>>();
    private final Map<String, Consumer<RdsVirtualHostListener>> rdsObserveConsumer = new ConcurrentHashMap<String, Consumer<RdsVirtualHostListener>>();
    private static PilotExchanger GLOBAL_PILOT_EXCHANGER = null;
    private final ApplicationModel applicationModel;

    protected PilotExchanger(URL url) {
        this.xdsChannel = new XdsChannel(url);
        int pollingTimeout = url.getParameter("pollingTimeout", 10);
        this.applicationModel = url.getOrDefaultApplicationModel();
        AdsObserver adsObserver = new AdsObserver(url, NodeBuilder.build());
        this.ldsProtocol = new LdsProtocol(adsObserver, NodeBuilder.build(), pollingTimeout);
        this.rdsProtocol = new RdsProtocol(adsObserver, NodeBuilder.build(), pollingTimeout);
        this.edsProtocol = new EdsProtocol(adsObserver, NodeBuilder.build(), pollingTimeout);
        this.listenerResult = this.ldsProtocol.getListeners();
        this.routeResult = this.rdsProtocol.getResource(this.listenerResult.values().iterator().next().getRouteConfigNames());
        HashSet<String> ldsResourcesName = new HashSet<String>();
        ldsResourcesName.add("emptyResourcesName");
        if (CollectionUtils.isNotEmpty(this.listenerResult.values().iterator().next().getRouteConfigNames())) {
            this.createRouteObserve();
            this.isRdsObserve.set(true);
        }
        this.ldsProtocol.observeResource(ldsResourcesName, newListener -> {
            if (!newListener.equals(this.listenerResult)) {
                this.listenerResult = newListener;
                if (this.isRdsObserve.get()) {
                    this.createRouteObserve();
                }
            }
        }, false);
    }

    private void createRouteObserve() {
        this.rdsProtocol.observeResource(this.listenerResult.values().iterator().next().getRouteConfigNames(), newResult -> {
            LinkedList domainsToUpdate = new LinkedList();
            this.domainObserveConsumer.forEach((domain, consumer) -> newResult.values().forEach(o -> {
                Set<String> newRoute = o.searchDomain((String)domain);
                for (Map.Entry<String, RouteResult> entry : this.routeResult.entrySet()) {
                    if (entry.getValue().searchDomain((String)domain).equals(newRoute)) continue;
                    domainsToUpdate.add(domain);
                }
            }));
            this.routeResult = newResult;
            ExecutorService executorService = ((FrameworkExecutorRepository)this.applicationModel.getFrameworkModel().getBeanFactory().getBean(FrameworkExecutorRepository.class)).getSharedExecutor();
            executorService.submit(() -> domainsToUpdate.forEach(this::doObserveEndpoints));
        }, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static PilotExchanger initialize(URL url) {
        Class<PilotExchanger> clazz = PilotExchanger.class;
        synchronized (PilotExchanger.class) {
            if (GLOBAL_PILOT_EXCHANGER != null) {
                // ** MonitorExit[var1_1] (shouldn't be in output)
                return GLOBAL_PILOT_EXCHANGER;
            }
            GLOBAL_PILOT_EXCHANGER = new PilotExchanger(url);
            // ** MonitorExit[var1_1] (shouldn't be in output)
            return GLOBAL_PILOT_EXCHANGER;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static PilotExchanger getInstance() {
        Class<PilotExchanger> clazz = PilotExchanger.class;
        synchronized (PilotExchanger.class) {
            // ** MonitorExit[var0] (shouldn't be in output)
            return GLOBAL_PILOT_EXCHANGER;
        }
    }

    public static boolean isEnabled() {
        return GLOBAL_PILOT_EXCHANGER != null;
    }

    public void destroy() {
        this.xdsChannel.destroy();
    }

    public Set<String> getServices() {
        HashSet<String> domains = new HashSet<String>();
        for (Map.Entry<String, RouteResult> entry : this.routeResult.entrySet()) {
            domains.addAll(entry.getValue().getDomains());
        }
        return domains;
    }

    public Set<Endpoint> getEndpoints(String domain) {
        HashSet<Endpoint> endpoints = new HashSet<Endpoint>();
        for (Map.Entry<String, RouteResult> entry : this.routeResult.entrySet()) {
            Set<String> cluster = entry.getValue().searchDomain(domain);
            if (CollectionUtils.isNotEmpty(cluster)) {
                Map endpointResultList = this.edsProtocol.getResource(cluster);
                endpointResultList.forEach((k, v) -> endpoints.addAll(v.getEndpoints()));
                continue;
            }
            return Collections.emptySet();
        }
        return endpoints;
    }

    public void observeEndpoints(String domain, Consumer<Set<Endpoint>> consumer) {
        this.domainObserveConsumer.compute(domain, (k, v) -> {
            if (v == null) {
                v = new ConcurrentHashSet();
            }
            v.add(consumer);
            return v;
        });
        if (!this.domainObserveRequest.contains(domain)) {
            this.doObserveEndpoints(domain);
        }
    }

    private void doObserveEndpoints(String domain) {
        for (Map.Entry<String, RouteResult> entry : this.routeResult.entrySet()) {
            Set<String> router = entry.getValue().searchDomain(domain);
            if (!CollectionUtils.isNotEmpty(router)) continue;
            this.edsProtocol.observeResource(router, endpointResultMap -> {
                Set endpoints = endpointResultMap.values().stream().map(EndpointResult::getEndpoints).flatMap(Collection::stream).collect(Collectors.toSet());
                for (Consumer<Set<Endpoint>> consumer : this.domainObserveConsumer.get(domain)) {
                    consumer.accept(endpoints);
                }
            }, false);
            this.domainObserveRequest.add(domain);
        }
    }

    public void unObserveEndpoints(String domain, Consumer<Set<Endpoint>> consumer) {
        this.domainObserveConsumer.get(domain).remove(consumer);
        this.domainObserveRequest.remove(domain);
    }

    public void observeEds(Set<String> clusterNames, Consumer<Map<String, EndpointResult>> consumer) {
        this.edsProtocol.observeResource(clusterNames, consumer, false);
    }

    public void unObserveEds(Set<String> clusterNames, Consumer<Map<String, EndpointResult>> consumer) {
        this.edsProtocol.unobserveResource(clusterNames, consumer);
    }

    public void observeRds(Set<String> clusterNames, Consumer<Map<String, RouteResult>> consumer) {
        this.rdsProtocol.observeResource(clusterNames, consumer, false);
    }

    public void unObserveRds(Set<String> clusterNames, Consumer<Map<String, RouteResult>> consumer) {
        this.rdsProtocol.unobserveResource(clusterNames, consumer);
    }

    public void observeLds(Consumer<Map<String, ListenerResult>> consumer) {
        this.ldsProtocol.observeResource(Collections.singleton("emptyResourcesName"), consumer, false);
    }

    public void unObserveLds(Consumer<Map<String, ListenerResult>> consumer) {
        this.ldsProtocol.unobserveResource(Collections.singleton("emptyResourcesName"), consumer);
    }
}

