/*
 * Decompiled with CFR 0.152.
 */
package com.yahoo.search.dispatch;

import com.yahoo.component.AbstractComponent;
import com.yahoo.component.ComponentId;
import com.yahoo.component.annotation.Inject;
import com.yahoo.compress.Compressor;
import com.yahoo.container.handler.VipStatus;
import com.yahoo.prelude.fastsearch.VespaBackend;
import com.yahoo.processing.request.CompoundName;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
import com.yahoo.search.cluster.ClusterMonitor;
import com.yahoo.search.dispatch.CloseableInvoker;
import com.yahoo.search.dispatch.FillInvoker;
import com.yahoo.search.dispatch.InvokerFactory;
import com.yahoo.search.dispatch.LoadBalancer;
import com.yahoo.search.dispatch.RequestDuration;
import com.yahoo.search.dispatch.SearchErrorInvoker;
import com.yahoo.search.dispatch.SearchInvoker;
import com.yahoo.search.dispatch.SearchPath;
import com.yahoo.search.dispatch.rpc.RpcConnectionPool;
import com.yahoo.search.dispatch.rpc.RpcInvokerFactory;
import com.yahoo.search.dispatch.rpc.RpcPingFactory;
import com.yahoo.search.dispatch.rpc.RpcResourcePool;
import com.yahoo.search.dispatch.searchcluster.AvailabilityPolicy;
import com.yahoo.search.dispatch.searchcluster.Group;
import com.yahoo.search.dispatch.searchcluster.Node;
import com.yahoo.search.dispatch.searchcluster.SearchCluster;
import com.yahoo.search.dispatch.searchcluster.SearchGroups;
import com.yahoo.search.query.Model;
import com.yahoo.search.query.profile.types.FieldDescription;
import com.yahoo.search.query.profile.types.FieldType;
import com.yahoo.search.query.profile.types.QueryProfileType;
import com.yahoo.search.result.ErrorMessage;
import com.yahoo.vespa.config.search.DispatchConfig;
import com.yahoo.vespa.config.search.DispatchNodesConfig;
import java.time.Duration;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;

public class Dispatcher
extends AbstractComponent {
    public static final String DISPATCH = "dispatch";
    private static final String TOP_K_PROBABILITY = "topKProbability";
    private static final String DOCSUM_RETRY_LIMIT = "docsumRetryLimit";
    private static final String DOCSUM_RETRY_FACTOR = "docsumRetryFactor";
    private static final int MAX_GROUP_SELECTION_ATTEMPTS = 3;
    public static final CompoundName topKProbability = CompoundName.from((String)"dispatch.topKProbability");
    public static final CompoundName docsumRetryLimit = CompoundName.from((String)"dispatch.docsumRetryLimit");
    public static final CompoundName docsumRetryFactor = CompoundName.from((String)"dispatch.docsumRetryFactor");
    private final InvokerFactoryFactory invokerFactories;
    private final DispatchConfig dispatchConfig;
    private final RpcConnectionPool rpcResourcePool;
    private final SearchCluster searchCluster;
    private final ClusterMonitor<Node> clusterMonitor;
    private volatile VolatileItems volatileItems;
    private static final QueryProfileType argumentType = new QueryProfileType("dispatch");

    public static QueryProfileType getArgumentType() {
        return argumentType;
    }

    @Inject
    public Dispatcher(ComponentId clusterId, DispatchConfig dispatchConfig, DispatchNodesConfig nodesConfig, VipStatus vipStatus) {
        this(clusterId, dispatchConfig, new RpcResourcePool(dispatchConfig, nodesConfig), nodesConfig, vipStatus, RpcInvokerFactory::new);
        this.initialWarmup(dispatchConfig.warmuptime());
    }

    Dispatcher(ComponentId clusterId, DispatchConfig dispatchConfig, RpcConnectionPool rpcConnectionPool, DispatchNodesConfig nodesConfig, VipStatus vipStatus, InvokerFactoryFactory invokerFactories) {
        this(dispatchConfig, rpcConnectionPool, new SearchCluster(clusterId.stringValue(), AvailabilityPolicy.from(dispatchConfig), Dispatcher.toNodes(clusterId.stringValue(), nodesConfig), vipStatus, new RpcPingFactory(rpcConnectionPool)), invokerFactories);
    }

    Dispatcher(DispatchConfig dispatchConfig, RpcConnectionPool rpcConnectionPool, SearchCluster searchCluster, InvokerFactoryFactory invokerFactories) {
        this(dispatchConfig, rpcConnectionPool, searchCluster, new ClusterMonitor<Node>(searchCluster, false), invokerFactories);
        this.clusterMonitor.start();
    }

    Dispatcher(DispatchConfig dispatchConfig, RpcConnectionPool rpcConnectionPool, SearchCluster searchCluster, ClusterMonitor<Node> clusterMonitor, InvokerFactoryFactory invokerFactories) {
        this.dispatchConfig = dispatchConfig;
        this.rpcResourcePool = rpcConnectionPool;
        this.searchCluster = searchCluster;
        this.invokerFactories = invokerFactories;
        this.clusterMonitor = clusterMonitor;
        this.volatileItems = this.update();
        searchCluster.addMonitoring(clusterMonitor);
    }

    Dispatcher(ClusterMonitor<Node> clusterMonitor, SearchCluster searchCluster, DispatchConfig dispatchConfig, InvokerFactory invokerFactory) {
        this(dispatchConfig, null, searchCluster, clusterMonitor, (__, ___, ____) -> invokerFactory);
    }

    private VolatileItems.Ref volatileItems() {
        return this.volatileItems.new VolatileItems.Ref();
    }

    void updateWithNewConfig(DispatchNodesConfig nodesConfig) {
        try (VolatileItems.Ref items = this.volatileItems();){
            items.get().countDown();
            Collection<? extends AutoCloseable> connectionPoolsToClose = this.rpcResourcePool.updateNodes(nodesConfig);
            items.get().cleanup = () -> {
                for (AutoCloseable pool : connectionPoolsToClose) {
                    try {
                        pool.close();
                    }
                    catch (Exception exception) {}
                }
            };
            this.searchCluster.updateNodes(AvailabilityPolicy.from(this.dispatchConfig), Dispatcher.toNodes(this.searchCluster.name(), nodesConfig), this.clusterMonitor);
            this.volatileItems = this.update();
        }
    }

    private VolatileItems update() {
        return new VolatileItems(new LoadBalancer(this.searchCluster.groupList().groups(), Dispatcher.toLoadBalancerPolicy(this.dispatchConfig.distributionPolicy())), this.invokerFactories.create(this.rpcResourcePool, this.searchCluster.groupList(), this.dispatchConfig));
    }

    private void initialWarmup(double warmupTime) {
        Thread warmup = new Thread(() -> Dispatcher.warmup(warmupTime));
        warmup.start();
        try {
            while (!this.searchCluster.hasInformationAboutAllNodes()) {
                Thread.sleep(1L);
            }
            warmup.join();
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        this.searchCluster.pingIterationCompleted();
    }

    private static LoadBalancer.Policy toLoadBalancerPolicy(DispatchConfig.DistributionPolicy.Enum policy) {
        return switch (policy) {
            default -> throw new IncompatibleClassChangeError();
            case DispatchConfig.DistributionPolicy.Enum.ROUNDROBIN -> LoadBalancer.Policy.ROUNDROBIN;
            case DispatchConfig.DistributionPolicy.Enum.BEST_OF_RANDOM_2 -> LoadBalancer.Policy.BEST_OF_RANDOM_2;
            case DispatchConfig.DistributionPolicy.Enum.ADAPTIVE, DispatchConfig.DistributionPolicy.Enum.LATENCY_AMORTIZED_OVER_REQUESTS -> LoadBalancer.Policy.LATENCY_AMORTIZED_OVER_REQUESTS;
            case DispatchConfig.DistributionPolicy.Enum.LATENCY_AMORTIZED_OVER_TIME -> LoadBalancer.Policy.LATENCY_AMORTIZED_OVER_TIME;
        };
    }

    private static List<Node> toNodes(String clusterName, DispatchNodesConfig nodesConfig) {
        return nodesConfig.node().stream().map(n -> new Node(clusterName, n.key(), n.host(), n.group())).toList();
    }

    private static void warmup(double seconds) {
        new Compressor().warmup(seconds);
    }

    public boolean allGroupsHaveSize1() {
        return this.searchCluster.groupList().groups().stream().allMatch(g -> g.nodes().size() == 1);
    }

    public void deconstruct() {
        this.clusterMonitor.shutdown();
        if (this.rpcResourcePool != null) {
            this.rpcResourcePool.close();
        }
    }

    public FillInvoker getFillInvoker(Result result, VespaBackend searcher) {
        try (VolatileItems.Ref items = this.volatileItems();){
            FillInvoker fillInvoker = items.register(items.get().invokerFactory.createFillInvoker(searcher, result));
            return fillInvoker;
        }
    }

    public SearchInvoker getSearchInvoker(Query query, VespaBackend searcher) {
        try (VolatileItems.Ref items = this.volatileItems();){
            int maxHitsPerNode = this.dispatchConfig.maxHitsPerNode();
            SearchInvoker invoker = Dispatcher.getSearchPathInvoker(query, searcher, this.searchCluster.groupList(), items.get().invokerFactory, maxHitsPerNode).orElseGet(() -> Dispatcher.getInternalInvoker(query, searcher, this.searchCluster, items.get().loadBalancer, items.get().invokerFactory, maxHitsPerNode));
            if (query.properties().getBoolean(Model.ESTIMATE)) {
                query.setHits(0);
                query.setOffset(0);
            }
            SearchInvoker searchInvoker = items.register(invoker);
            return searchInvoker;
        }
    }

    private static Optional<SearchInvoker> getSearchPathInvoker(Query query, VespaBackend searcher, SearchGroups cluster, InvokerFactory invokerFactory, int maxHitsPerNode) {
        String searchPath = query.getModel().getSearchPath();
        if (searchPath == null) {
            return Optional.empty();
        }
        try {
            List<Node> nodes = SearchPath.selectNodes(searchPath, cluster);
            if (nodes.isEmpty()) {
                return Optional.empty();
            }
            query.trace(false, 2, "Dispatching with search path ", searchPath);
            return invokerFactory.createSearchInvoker(searcher, query, nodes, true, maxHitsPerNode);
        }
        catch (SearchPath.InvalidSearchPathException e) {
            return Optional.of(new SearchErrorInvoker(ErrorMessage.createIllegalQuery(e.getMessage())));
        }
    }

    private static SearchInvoker getInternalInvoker(Query query, VespaBackend searcher, SearchCluster cluster, LoadBalancer loadBalancer, InvokerFactory invokerFactory, int maxHitsPerNode) {
        Optional<Group> groupInCluster;
        Optional<Node> directNode = cluster.localCorpusDispatchTarget();
        if (directNode.isPresent()) {
            Node node = directNode.get();
            query.trace(false, 2, "Dispatching to ", node);
            return invokerFactory.createSearchInvoker(searcher, query, List.of(node), true, maxHitsPerNode).orElseThrow(() -> new IllegalStateException("Could not dispatch directly to " + String.valueOf(node)));
        }
        int covered = cluster.groupsWithSufficientCoverage();
        int groups = cluster.groupList().size();
        int max = Integer.min(Integer.min(covered + 1, groups), 3);
        HashSet<Integer> rejected = new HashSet<Integer>();
        for (int i = 0; i < max && !(groupInCluster = loadBalancer.takeGroup(rejected)).isEmpty(); ++i) {
            Group group = groupInCluster.get();
            boolean acceptIncompleteCoverage = i == max - 1;
            Optional<SearchInvoker> invoker = invokerFactory.createSearchInvoker(searcher, query, group.nodes(), acceptIncompleteCoverage, maxHitsPerNode);
            if (invoker.isPresent()) {
                query.trace(false, 2, "Dispatching to group ", group.id(), " after retries = ", i);
                query.getModel().setSearchPath("/" + group.id());
                invoker.get().teardown((success, time) -> loadBalancer.releaseGroup(group, (boolean)success, (RequestDuration)time));
                return invoker.get();
            }
            loadBalancer.releaseGroup(group, false, RequestDuration.of(Duration.ZERO));
            rejected.add(group.id());
        }
        throw new IllegalStateException("No suitable groups to dispatch query. Rejected: " + String.valueOf(rejected));
    }

    static {
        argumentType.setStrict(true);
        argumentType.setBuiltin(true);
        argumentType.addField(new FieldDescription(TOP_K_PROBABILITY, FieldType.doubleType));
        argumentType.addField(new FieldDescription(DOCSUM_RETRY_LIMIT, FieldType.integerType));
        argumentType.addField(new FieldDescription(DOCSUM_RETRY_FACTOR, FieldType.doubleType));
        argumentType.freeze();
    }

    static interface InvokerFactoryFactory {
        public InvokerFactory create(RpcConnectionPool var1, SearchGroups var2, DispatchConfig var3);
    }

    private static class VolatileItems {
        final LoadBalancer loadBalancer;
        final InvokerFactory invokerFactory;
        final AtomicInteger inflight = new AtomicInteger(1);
        Runnable cleanup = () -> {};

        VolatileItems(LoadBalancer loadBalancer, InvokerFactory invokerFactory) {
            this.loadBalancer = loadBalancer;
            this.invokerFactory = invokerFactory;
        }

        private void countDown() {
            if (this.inflight.decrementAndGet() == 0) {
                this.cleanup.run();
            }
        }

        private class Ref
        implements AutoCloseable {
            boolean handedOff = false;

            private Ref() {
                VolatileItems.this.inflight.incrementAndGet();
            }

            VolatileItems get() {
                return VolatileItems.this;
            }

            <T extends CloseableInvoker> T register(T invoker) {
                invoker.teardown((__, ___) -> VolatileItems.this.countDown());
                this.handedOff = true;
                return invoker;
            }

            @Override
            public void close() {
                if (!this.handedOff) {
                    VolatileItems.this.countDown();
                }
            }
        }
    }
}

