/*
 * Decompiled with CFR 0.152.
 */
package com.yahoo.documentapi.messagebus.protocol;

import com.yahoo.concurrent.CopyOnWriteHashMap;
import com.yahoo.document.BucketId;
import com.yahoo.document.BucketIdFactory;
import com.yahoo.documentapi.messagebus.protocol.CreateVisitorMessage;
import com.yahoo.documentapi.messagebus.protocol.GetBucketListMessage;
import com.yahoo.documentapi.messagebus.protocol.GetDocumentMessage;
import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage;
import com.yahoo.documentapi.messagebus.protocol.RemoveLocationMessage;
import com.yahoo.documentapi.messagebus.protocol.SlobrokPolicy;
import com.yahoo.documentapi.messagebus.protocol.StatBucketMessage;
import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage;
import com.yahoo.documentapi.messagebus.protocol.WriteDocumentReply;
import com.yahoo.documentapi.messagebus.protocol.WrongDistributionReply;
import com.yahoo.jrt.slobrok.api.IMirror;
import com.yahoo.jrt.slobrok.api.Mirror;
import com.yahoo.messagebus.EmptyReply;
import com.yahoo.messagebus.Error;
import com.yahoo.messagebus.Message;
import com.yahoo.messagebus.Reply;
import com.yahoo.messagebus.routing.Hop;
import com.yahoo.messagebus.routing.HopDirective;
import com.yahoo.messagebus.routing.Route;
import com.yahoo.messagebus.routing.RoutingContext;
import com.yahoo.messagebus.routing.RoutingNodeIterator;
import com.yahoo.messagebus.routing.VerbatimDirective;
import com.yahoo.vdslib.distribution.Distribution;
import com.yahoo.vdslib.state.ClusterState;
import com.yahoo.vdslib.state.Node;
import com.yahoo.vdslib.state.NodeType;
import com.yahoo.vdslib.state.State;
import com.yahoo.vespa.config.content.DistributionConfig;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;

public class ContentPolicy
extends SlobrokPolicy {
    private static final Logger log = Logger.getLogger(ContentPolicy.class.getName());
    public static final String owningBucketStates = "uim";
    private static final String upStates = "ui";
    private final BucketIdCalculator bucketIdCalculator = new BucketIdCalculator();
    private final DistributorSelectionLogic distributorSelectionLogic;
    private final Parameters parameters;

    public ContentPolicy(String param, DistributionConfig config) {
        this(new Parameters(ContentPolicy.parse(param), config));
    }

    public ContentPolicy(Parameters p) {
        this.parameters = p;
        this.distributorSelectionLogic = new DistributorSelectionLogic(this.parameters, this);
    }

    public void select(RoutingContext context) {
        if (context.shouldTrace(1)) {
            context.trace(1, "Selecting route");
        }
        BucketId bucketId = this.bucketIdCalculator.handleBucketIdCalculation(context);
        if (context.hasReply()) {
            return;
        }
        String targetSpec = this.distributorSelectionLogic.getTargetSpec(context, bucketId);
        if (context.hasReply()) {
            return;
        }
        if (targetSpec != null) {
            Route route = new Route(context.getRoute());
            route.setHop(0, new Hop().addDirective((HopDirective)new VerbatimDirective(targetSpec)));
            context.addChild(route);
        } else {
            context.setError(100002, "Could not resolve any distributors to send to in cluster " + this.parameters.clusterName);
        }
    }

    public void merge(RoutingContext context) {
        Reply reply;
        RoutingNodeIterator it = context.getChildIterator();
        Reply reply2 = reply = it.hasReply() ? it.removeReply() : context.getReply();
        if (reply == null) {
            reply = new EmptyReply();
            reply.addError(new Error(100002, "No reply in any children, nor in the routing context: " + String.valueOf(context)));
        }
        if (reply instanceof WrongDistributionReply) {
            this.distributorSelectionLogic.handleWrongDistribution((WrongDistributionReply)reply, context);
        } else if (reply.hasErrors()) {
            this.distributorSelectionLogic.handleErrorReply(reply, context.getContext());
        } else if (reply instanceof WriteDocumentReply && context.shouldTrace(9)) {
            context.trace(9, "Modification timestamp: " + ((WriteDocumentReply)reply).getHighestModificationTimestamp());
        }
        context.setReply(reply);
    }

    public void destroy() {
        this.distributorSelectionLogic.destroy();
    }

    public static class Parameters {
        protected final String clusterName;
        protected final String distributionConfigId;
        protected final DistributionConfig distributionConfig;
        protected final SlobrokHostPatternGenerator slobrokHostPatternGenerator;

        public Parameters(Map<String, String> params) {
            this(params, null);
        }

        private Parameters(Map<String, String> params, DistributionConfig config) {
            this.clusterName = params.get("cluster");
            if (this.clusterName == null) {
                throw new IllegalArgumentException("Required parameter 'cluster', the name of the content cluster, not set");
            }
            this.distributionConfig = config;
            if (this.distributionConfig != null && this.distributionConfig.cluster(this.clusterName) == null) {
                throw new IllegalArgumentException("Distribution config for cluster '" + this.clusterName + "' not found");
            }
            this.distributionConfigId = params.get("clusterconfigid");
            this.slobrokHostPatternGenerator = this.createPatternGenerator();
        }

        private String getDistributionConfigId() {
            return this.distributionConfigId == null ? this.clusterName : this.distributionConfigId;
        }

        public final String getClusterName() {
            return this.clusterName;
        }

        private SlobrokHostPatternGenerator createPatternGenerator() {
            return new SlobrokHostPatternGenerator(this.getClusterName());
        }

        public HostFetcher createHostFetcher(SlobrokPolicy policy, int percent) {
            return new TargetCachingSlobrokHostFetcher(this.slobrokHostPatternGenerator, policy, percent);
        }

        public Distribution createDistribution(SlobrokPolicy policy) {
            return this.distributionConfig == null ? new Distribution(this.getDistributionConfigId()) : new Distribution(this.distributionConfig.cluster(this.clusterName));
        }

        public InstabilityChecker createInstabilityChecker() {
            return new PerNodeCountingInstabilityChecker(this.getAttemptRandomOnFailuresLimit());
        }

        int getAttemptRandomOnFailuresLimit() {
            return 5;
        }

        int maxOldClusterStatesSeenBeforeThrowingCachedState() {
            return 20;
        }

        int getRequiredUpPercentageToSendToKnownGoodNodes() {
            return 60;
        }
    }

    public static class BucketIdCalculator {
        private static final BucketIdFactory factory = new BucketIdFactory();

        private BucketId getBucketId(Message msg) {
            return switch (msg.getType()) {
                case 100004 -> factory.getBucketId(((PutDocumentMessage)msg).getDocumentPut().getDocument().getId());
                case 100003 -> factory.getBucketId(((GetDocumentMessage)msg).getDocumentId());
                case 100005 -> factory.getBucketId(((RemoveDocumentMessage)msg).getDocumentId());
                case 100006 -> factory.getBucketId(((UpdateDocumentMessage)msg).getDocumentUpdate().getId());
                case 100020 -> ((GetBucketListMessage)msg).getBucketId();
                case 100019 -> ((StatBucketMessage)msg).getBucketId();
                case 100007 -> ((CreateVisitorMessage)msg).getBuckets().get(0);
                case 100024 -> ((RemoveLocationMessage)msg).getBucketId();
                default -> {
                    log.log(Level.SEVERE, "Message type '" + msg.getType() + "' not supported.");
                    yield null;
                }
            };
        }

        BucketId handleBucketIdCalculation(RoutingContext context) {
            BucketId id = this.getBucketId(context.getMessage());
            if (id == null || id.getRawId() == 0L) {
                EmptyReply reply = new EmptyReply();
                reply.addError(new Error(250000, "No bucket id available in message."));
                context.setReply((Reply)reply);
            }
            return id;
        }
    }

    public static final class DistributorSelectionLogic {
        private final HostFetcher hostFetcher;
        private final Distribution distribution;
        private final InstabilityChecker persistentFailureChecker;
        private final AtomicReference<ClusterState> safeCachedClusterState = new AtomicReference<Object>(null);
        private final AtomicInteger oldClusterVersionGottenCount = new AtomicInteger(0);
        private final int maxOldClusterVersionBeforeSendingRandom;

        DistributorSelectionLogic(Parameters params, SlobrokPolicy policy) {
            try {
                this.hostFetcher = params.createHostFetcher(policy, params.getRequiredUpPercentageToSendToKnownGoodNodes());
                this.distribution = params.createDistribution(policy);
                this.persistentFailureChecker = params.createInstabilityChecker();
                this.maxOldClusterVersionBeforeSendingRandom = params.maxOldClusterStatesSeenBeforeThrowingCachedState();
            }
            catch (Throwable e) {
                this.destroy();
                throw e;
            }
        }

        public void destroy() {
            if (this.hostFetcher != null) {
                this.hostFetcher.close();
            }
            if (this.distribution != null) {
                this.distribution.close();
            }
        }

        String getTargetSpec(RoutingContext context, BucketId bucketId) {
            Object sendRandomReason = null;
            ClusterState cachedClusterState = this.safeCachedClusterState.get();
            if (cachedClusterState != null) {
                try {
                    Integer target = this.distribution.getIdealDistributorNode(cachedClusterState, bucketId, ContentPolicy.owningBucketStates);
                    if (this.persistentFailureChecker.tooManyFailures(target)) {
                        sendRandomReason = "Too many failures detected versus distributor " + target + ". Sending to random instead of using cached state.";
                        target = null;
                    }
                    if (target != null) {
                        context.setContext((Object)new MessageContext(cachedClusterState, target));
                        String targetSpec = this.hostFetcher.getTargetSpec(target, context);
                        if (targetSpec != null) {
                            if (context.shouldTrace(1)) {
                                context.trace(1, "Using distributor " + target + " for " + String.valueOf(bucketId) + " as our state version is " + cachedClusterState.getVersion());
                            }
                            return targetSpec;
                        }
                        sendRandomReason = "Want to use distributor " + target + " but it is not in slobrok. Sending to random.";
                        log.log(Level.FINE, "Target distributor is not in slobrok");
                    }
                    context.setContext((Object)new MessageContext(cachedClusterState));
                }
                catch (Distribution.TooFewBucketBitsInUseException e) {
                    WrongDistributionReply reply = new WrongDistributionReply(cachedClusterState.toString(true));
                    reply.addError(new Error(151002, "Too few distribution bits used for given cluster state"));
                    context.setReply((Reply)reply);
                    return null;
                }
                catch (Distribution.NoDistributorsAvailableException e) {
                    log.log(Level.FINE, "No distributors available; clearing cluster state");
                    this.safeCachedClusterState.set(null);
                    sendRandomReason = "No distributors available. Sending to random distributor.";
                    context.setContext((Object)DistributorSelectionLogic.createRandomDistributorTargetContext());
                }
            } else {
                context.setContext((Object)DistributorSelectionLogic.createRandomDistributorTargetContext());
                sendRandomReason = "No cluster state cached. Sending to random distributor.";
            }
            if (context.shouldTrace(1)) {
                context.trace(1, (String)(sendRandomReason != null ? sendRandomReason : "Sending to random distributor for unknown reason"));
            }
            return this.hostFetcher.getRandomTargetSpec(context);
        }

        private static MessageContext createRandomDistributorTargetContext() {
            return new MessageContext(null);
        }

        private static Optional<ClusterState> clusterStateFromReply(WrongDistributionReply reply) {
            try {
                return Optional.of(new ClusterState(reply.getSystemState()));
            }
            catch (Exception e) {
                reply.getTrace().trace(1, "Error when parsing system state string " + reply.getSystemState());
                return Optional.empty();
            }
        }

        void handleWrongDistribution(WrongDistributionReply reply, RoutingContext routingContext) {
            MessageContext context = (MessageContext)routingContext.getContext();
            Optional<ClusterState> replyState = DistributorSelectionLogic.clusterStateFromReply(reply);
            if (replyState.isEmpty()) {
                return;
            }
            ClusterState newState = replyState.get();
            this.resetCachedStateIfClusterStateVersionLikelyRolledBack(newState);
            this.markReplyAsImmediateRetryIfNewStateObserved(reply, context, newState);
            if (context.calculatedDistributor == null) {
                this.traceReplyFromRandomDistributor(reply, newState);
            } else {
                this.traceReplyFromSpecificDistributor(reply, context, newState);
            }
            this.updateCachedRoutingStateFromWrongDistribution(context, newState);
        }

        private void updateCachedRoutingStateFromWrongDistribution(MessageContext context, ClusterState newState) {
            ClusterState cachedClusterState = this.safeCachedClusterState.get();
            if (cachedClusterState == null || newState.getVersion() >= cachedClusterState.getVersion()) {
                this.safeCachedClusterState.set(newState);
                if (newState.getClusterState().equals((Object)State.UP)) {
                    this.hostFetcher.updateValidTargets(newState);
                }
            } else if (newState.getVersion() + 2000000000 < cachedClusterState.getVersion()) {
                this.safeCachedClusterState.set(null);
            } else if (context.calculatedDistributor != null) {
                this.persistentFailureChecker.addFailure(context.calculatedDistributor);
            }
        }

        private void traceReplyFromSpecificDistributor(WrongDistributionReply reply, MessageContext context, ClusterState newState) {
            if (context.usedState == null) {
                String msg = "Used state must be set as distributor is calculated. Bug.";
                reply.getTrace().trace(1, msg);
                log.log(Level.SEVERE, msg);
            } else if (newState.getVersion() == context.usedState.getVersion()) {
                String msg = "Message sent to distributor " + context.calculatedDistributor + " retrieved cluster state version " + newState.getVersion() + " which was the state we used to calculate distributor as target last time.";
                reply.getTrace().trace(1, msg);
                log.log(Level.FINE, msg);
            } else if (newState.getVersion() > context.usedState.getVersion()) {
                if (reply.getTrace().shouldTrace(1)) {
                    reply.getTrace().trace(1, "Message sent to distributor " + context.calculatedDistributor + " updated cluster state from version " + context.usedState.getVersion() + " to " + newState.getVersion());
                }
            } else if (reply.getTrace().shouldTrace(1)) {
                reply.getTrace().trace(1, "Message sent to distributor " + context.calculatedDistributor + " returned older cluster state version " + newState.getVersion());
            }
        }

        private void resetCachedStateIfClusterStateVersionLikelyRolledBack(ClusterState newState) {
            ClusterState cachedClusterState = this.safeCachedClusterState.get();
            if (cachedClusterState != null && cachedClusterState.getVersion() > newState.getVersion() && this.oldClusterVersionGottenCount.incrementAndGet() >= this.maxOldClusterVersionBeforeSendingRandom) {
                this.oldClusterVersionGottenCount.set(0);
                this.safeCachedClusterState.set(null);
            }
        }

        private void markReplyAsImmediateRetryIfNewStateObserved(WrongDistributionReply reply, MessageContext context, ClusterState newState) {
            if (context.usedState != null && newState.getVersion() <= context.usedState.getVersion()) {
                if (reply.getRetryDelay() <= 0.0) {
                    reply.setRetryDelay(-1.0);
                }
            } else if (reply.getRetryDelay() <= 0.0) {
                reply.setRetryDelay(0.0);
            }
        }

        private void traceReplyFromRandomDistributor(WrongDistributionReply reply, ClusterState newState) {
            if (!reply.getTrace().shouldTrace(1)) {
                return;
            }
            ClusterState cachedClusterState = this.safeCachedClusterState.get();
            if (cachedClusterState == null) {
                reply.getTrace().trace(1, "Message sent to * with no previous state, received version " + newState.getVersion());
            } else if (newState.getVersion() == cachedClusterState.getVersion()) {
                reply.getTrace().trace(1, "Message sent to * found that cluster state version " + newState.getVersion() + " was correct.");
            } else if (newState.getVersion() > cachedClusterState.getVersion()) {
                reply.getTrace().trace(1, "Message sent to * updated cluster state to version " + newState.getVersion());
            } else {
                reply.getTrace().trace(1, "Message sent to * retrieved older cluster state version " + newState.getVersion());
            }
        }

        private static boolean shouldCountAsErrorForRandomSendTrigger(Reply reply) {
            if (reply.getNumErrors() != 1) {
                return !reply.hasErrors();
            }
            Error error = reply.getError(0);
            return switch (error.getCode()) {
                case 151005, 251013 -> false;
                default -> true;
            };
        }

        void handleErrorReply(Reply reply, Object untypedContext) {
            MessageContext messageContext = (MessageContext)untypedContext;
            if (messageContext.calculatedDistributor != null) {
                if (DistributorSelectionLogic.shouldCountAsErrorForRandomSendTrigger(reply)) {
                    this.persistentFailureChecker.addFailure(messageContext.calculatedDistributor);
                }
                if (reply.getTrace().shouldTrace(1)) {
                    reply.getTrace().trace(1, "Failed with " + messageContext.toString());
                }
            }
        }

        private static class MessageContext {
            final Integer calculatedDistributor;
            final ClusterState usedState;

            MessageContext(ClusterState usedState) {
                this(usedState, null);
            }

            MessageContext(ClusterState usedState, Integer calculatedDistributor) {
                this.calculatedDistributor = calculatedDistributor;
                this.usedState = usedState;
            }

            public String toString() {
                return "Context(Distributor " + this.calculatedDistributor + ", state version " + this.usedState.getVersion() + ")";
            }
        }
    }

    public static class PerNodeCountingInstabilityChecker
    implements InstabilityChecker {
        private final List<Integer> nodeFailures = new CopyOnWriteArrayList<Integer>();
        private final int failureLimit;

        public PerNodeCountingInstabilityChecker(int failureLimit) {
            this.failureLimit = failureLimit;
        }

        @Override
        public boolean tooManyFailures(int nodeIndex) {
            if (this.nodeFailures.size() > nodeIndex && this.nodeFailures.get(nodeIndex) > this.failureLimit) {
                this.nodeFailures.set(nodeIndex, 0);
                return true;
            }
            return false;
        }

        @Override
        public void addFailure(Integer calculatedDistributor) {
            while (this.nodeFailures.size() <= calculatedDistributor) {
                this.nodeFailures.add(0);
            }
            this.nodeFailures.set(calculatedDistributor, this.nodeFailures.get(calculatedDistributor) + 1);
        }
    }

    public static interface InstabilityChecker {
        public boolean tooManyFailures(int var1);

        public void addFailure(Integer var1);
    }

    static class TargetCachingSlobrokHostFetcher
    extends SlobrokHostFetcher {
        private final AtomicReference<GenerationCache> generationCache = new AtomicReference<Object>(null);

        TargetCachingSlobrokHostFetcher(SlobrokHostPatternGenerator patternGenerator, SlobrokPolicy policy, int percent) {
            super(patternGenerator, policy, percent);
        }

        @Override
        public String getTargetSpec(Integer distributor, RoutingContext context) {
            GenerationCache cache = this.generationCache.get();
            int currentGeneration = this.getMirror(context).updates();
            if (cache == null || currentGeneration != cache.generation()) {
                cache = new GenerationCache(currentGeneration);
                this.generationCache.set(cache);
            }
            if (distributor != null) {
                return this.cachingGetTargetSpec(distributor, context, cache);
            }
            return super.getTargetSpec(null, context);
        }

        private String cachingGetTargetSpec(Integer distributor, RoutingContext context, GenerationCache cache) {
            String cachedTarget = cache.get(distributor);
            if (cachedTarget != null) {
                return cachedTarget;
            }
            String resolvedTarget = super.getTargetSpec(distributor, context);
            cache.put(distributor, resolvedTarget);
            return resolvedTarget;
        }

        private static class GenerationCache {
            private final int generation;
            private final CopyOnWriteHashMap<Integer, String> targets = new CopyOnWriteHashMap();

            GenerationCache(int generation) {
                this.generation = generation;
            }

            public int generation() {
                return this.generation;
            }

            public String get(Integer index) {
                return (String)this.targets.get((Object)index);
            }

            public void put(Integer index, String target) {
                this.targets.put((Object)index, (Object)target);
            }
        }
    }

    public static class SlobrokHostFetcher
    extends HostFetcher {
        private final SlobrokHostPatternGenerator patternGenerator;
        private final SlobrokPolicy policy;

        SlobrokHostFetcher(SlobrokHostPatternGenerator patternGenerator, SlobrokPolicy policy, int percent) {
            super(percent);
            this.patternGenerator = patternGenerator;
            this.policy = policy;
        }

        private List<Mirror.Entry> getEntries(String hostPattern, RoutingContext context) {
            return this.policy.lookup(context, hostPattern);
        }

        private String convertSlobrokNameToSessionName(String slobrokName) {
            return slobrokName + "/default";
        }

        public IMirror getMirror(RoutingContext context) {
            return context.getMirror();
        }

        @Override
        public String getTargetSpec(Integer distributor, RoutingContext context) {
            List<Mirror.Entry> arr = this.getEntries(this.patternGenerator.getDistributorHostPattern(distributor), context);
            if (arr.isEmpty()) {
                return null;
            }
            if (distributor != null) {
                if (arr.size() == 1) {
                    return this.convertSlobrokNameToSessionName(arr.get(0).getSpecString());
                }
            } else {
                return this.convertSlobrokNameToSessionName(arr.get(this.randomizer.nextInt(arr.size())).getSpecString());
            }
            log.log(Level.WARNING, "Got " + arr.size() + " matches for a distributor.");
            return null;
        }
    }

    public static abstract class HostFetcher {
        private final int requiredUpPercentageToSendToKnownGoodNodes;
        private final AtomicReference<Targets> validTargets = new AtomicReference<Targets>(new Targets());
        protected final Random randomizer = new Random(12345L);

        protected HostFetcher(int percent) {
            this.requiredUpPercentageToSendToKnownGoodNodes = percent;
        }

        void updateValidTargets(ClusterState state) {
            ArrayList<Integer> validRandomTargets = new ArrayList<Integer>();
            for (int i = 0; i < state.getNodeCount(NodeType.DISTRIBUTOR); ++i) {
                if (!state.getNodeState(new Node(NodeType.DISTRIBUTOR, i)).getState().oneOf(ContentPolicy.upStates)) continue;
                validRandomTargets.add(i);
            }
            this.validTargets.set(new Targets(validRandomTargets, state.getNodeCount(NodeType.DISTRIBUTOR)));
        }

        public abstract String getTargetSpec(Integer var1, RoutingContext var2);

        String getRandomTargetSpec(RoutingContext context) {
            Targets targets = this.validTargets.get();
            while (100 * targets.size() >= this.requiredUpPercentageToSendToKnownGoodNodes * targets.total) {
                Integer distributor = targets.get(this.randomizer);
                String targetSpec = this.getTargetSpec(distributor, context);
                if (targetSpec != null) {
                    context.trace(3, "Sending to random node seen up in cluster state");
                    return targetSpec;
                }
                targets.remove(distributor);
            }
            context.trace(3, "Too few nodes seen up in state. Sending totally random.");
            return this.getTargetSpec(null, context);
        }

        public void close() {
        }

        private static class Targets {
            private final AtomicReference<List<Integer>> list = new AtomicReference();
            final int total;

            Targets() {
                this(List.of(), 0);
            }

            Targets(List<Integer> list, int total) {
                this.list.set(List.copyOf(list));
                this.total = Math.max(1, total);
            }

            Integer get(Random randomizer) {
                List<Integer> snapshot = this.list.get();
                return snapshot.get(randomizer.nextInt(snapshot.size()));
            }

            synchronized void remove(Integer v) {
                List<Integer> snapshot = this.list.get();
                if (snapshot.contains(v)) {
                    this.list.set(snapshot.stream().filter(item -> !v.equals(item)).toList());
                }
            }

            int size() {
                return this.list.get().size();
            }
        }
    }

    static class SlobrokHostPatternGenerator {
        private final String base;

        SlobrokHostPatternGenerator(String clusterName) {
            this.base = "storage/cluster." + clusterName + "/distributor/";
        }

        String getDistributorHostPattern(Integer distributor) {
            return this.base + String.valueOf(distributor == null ? "*" : distributor) + "/default";
        }
    }
}

