/*
 * Decompiled with CFR 0.152.
 */
package com.yahoo.search.dispatch.rpc;

import ai.vespa.searchlib.searchprotocol.protobuf.SearchProtocol;
import com.google.protobuf.InvalidProtocolBufferException;
import com.yahoo.collections.ListMap;
import com.yahoo.compress.Compressor;
import com.yahoo.container.protect.Error;
import com.yahoo.data.access.slime.SlimeAdapter;
import com.yahoo.prelude.fastsearch.DocumentDatabase;
import com.yahoo.prelude.fastsearch.FastHit;
import com.yahoo.prelude.fastsearch.PartialSummaryHandler;
import com.yahoo.prelude.fastsearch.TimeoutException;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
import com.yahoo.search.dispatch.Dispatcher;
import com.yahoo.search.dispatch.FillInvoker;
import com.yahoo.search.dispatch.rpc.Client;
import com.yahoo.search.dispatch.rpc.CompressPayload;
import com.yahoo.search.dispatch.rpc.ProtobufSerialization;
import com.yahoo.search.dispatch.rpc.RpcConnectionPool;
import com.yahoo.search.dispatch.rpc.TimeoutHelper;
import com.yahoo.search.result.ErrorMessage;
import com.yahoo.search.result.Hit;
import com.yahoo.slime.BinaryFormat;
import com.yahoo.slime.BinaryView;
import com.yahoo.slime.Inspector;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;

public class RpcProtobufFillInvoker
extends FillInvoker {
    private static final String RPC_METHOD = "vespa.searchprotocol.getDocsums";
    private static final Logger log = Logger.getLogger(RpcProtobufFillInvoker.class.getName());
    private final DocumentDatabase documentDb;
    private final RpcConnectionPool resourcePool;
    private boolean summaryNeedsQuery;
    private final PartialSummaryHandler partialSummaryHandler;
    private final String serverId;
    private final CompressPayload compressor;
    private final DecodePolicy decodePolicy;
    private BlockingQueue<ResponseAndHits> responses;
    private boolean hasReportedError = false;
    private int outstandingResponses;
    private int numOkFilledHits = 0;
    private int numHitsToFill = 0;
    private static final AtomicInteger retryCounter = new AtomicInteger();
    private static final AtomicInteger noRetryCounter = new AtomicInteger();
    private static final AtomicInteger retryTimeoutCounter = new AtomicInteger();

    RpcProtobufFillInvoker(RpcConnectionPool resourcePool, CompressPayload compressor, DocumentDatabase documentDb, String serverId, DecodePolicy decodePolicy, boolean summaryNeedsQuery) {
        this.documentDb = documentDb;
        this.resourcePool = resourcePool;
        this.serverId = serverId;
        this.summaryNeedsQuery = summaryNeedsQuery;
        this.compressor = compressor;
        this.decodePolicy = decodePolicy;
        this.partialSummaryHandler = new PartialSummaryHandler(documentDb);
    }

    @Override
    protected void sendFillRequest(Result result, String summaryClass) {
        this.partialSummaryHandler.wantToFill(result, summaryClass);
        if (this.partialSummaryHandler.resultAlreadyFilled()) {
            result.getQuery().trace(false, 3, "Skipping fill of ", summaryClass, " as result is already filled");
            return;
        }
        ListMap<Integer, FastHit> hitsByNode = this.hitsByNode(result);
        int queueSize = Math.max(hitsByNode.size(), this.resourcePool.knownNodeIds().size());
        this.responses = new LinkedBlockingQueue<ResponseAndHits>(queueSize);
        this.sendFillRequestByNode(result, summaryClass, hitsByNode);
    }

    void sendFillRequestByNode(Result result, String summaryClass, ListMap<Integer, FastHit> hitsByNode) {
        result.getQuery().trace(false, 5, "Sending ", hitsByNode.size(), " summary fetch requests with jrt/protobuf");
        this.outstandingResponses = hitsByNode.size();
        TimeoutHelper.Timeout timeout = TimeoutHelper.calculateTimeout(result.getQuery());
        if (timeout.timedOut()) {
            hitsByNode.forEach((nodeId, hits) -> this.receive(Client.ResponseOrError.fromTimeoutError("Timed out prior to sending docsum request to " + nodeId), (List<FastHit>)hits));
            return;
        }
        String askForSummary = this.partialSummaryHandler.askForSummary();
        Set<String> onlyFields = this.partialSummaryHandler.askForFields();
        SearchProtocol.DocsumRequest.Builder builder = ProtobufSerialization.createDocsumRequestBuilder(result.getQuery(), this.serverId, askForSummary, onlyFields, this.summaryNeedsQuery, timeout.request());
        hitsByNode.forEach((nodeId, hits) -> {
            byte[] payload = ProtobufSerialization.serializeDocsumRequest(builder, hits);
            this.sendDocsumsRequest((int)nodeId, (List<FastHit>)hits, payload, result, timeout.client());
        });
    }

    @Override
    protected void getFillResults(Result result, String summaryClass) {
        if (this.partialSummaryHandler.resultAlreadyFilled()) {
            if (this.outstandingResponses == 0) {
                for (Hit hit : result.hits()::unorderedDeepIterator) {
                    if (!hit.isFillable() || !(hit instanceof FastHit)) continue;
                    FastHit fastHit = (FastHit)hit;
                    this.partialSummaryHandler.markFilled(hit);
                }
                return;
            }
            log.log(Level.SEVERE, "result was already filled, but there are " + this.outstandingResponses + " outstanding responses");
        }
        try {
            this.processResponses(result, summaryClass);
            result.hits().setSorted(false);
            result.analyzeHits();
        }
        catch (TimeoutException e) {
            result.hits().addError(ErrorMessage.createTimeout("Summary data is incomplete: " + e.getMessage()));
        }
    }

    @Override
    protected void release() {
    }

    public void receive(Client.ResponseOrError<Client.ProtobufResponse> response, List<FastHit> hitsContext) {
        this.responses.add(new ResponseAndHits(response, hitsContext));
    }

    private final ListMap<Integer, FastHit> hitsByNode(Result result) {
        ListMap hitsByNode = new ListMap();
        for (Hit hit : result.hits()::unorderedDeepIterator) {
            if (!(hit instanceof FastHit)) continue;
            FastHit fastHit = (FastHit)hit;
            ++this.numHitsToFill;
            hitsByNode.put((Object)fastHit.getDistributionKey(), (Object)fastHit);
        }
        return hitsByNode;
    }

    private void sendDocsumsRequest(int nodeId, List<FastHit> hits, byte[] payload, Result result, double clientTimeout) {
        Client.NodeConnection node = this.resourcePool.getConnection(nodeId);
        if (node == null) {
            String error = "Could not fill hits from unknown node " + nodeId;
            this.receive(Client.ResponseOrError.fromError(error), hits);
            result.hits().addError(ErrorMessage.createEmptyDocsums(error));
            log.warning("Got hits with node id " + nodeId + ", which is not included in the current dispatch config");
            return;
        }
        Query query = result.getQuery();
        Compressor.Compression compressionResult = this.compressor.compress(query, payload);
        node.request(RPC_METHOD, compressionResult.type(), payload.length, compressionResult.data(), roe -> this.receive(roe, hits), clientTimeout);
    }

    private ResponseAndHits getNextResponse(long timeLeftMs) throws InterruptedException {
        if (timeLeftMs <= 0L) {
            return null;
        }
        ResponseAndHits responseAndHits = this.responses.poll(timeLeftMs, TimeUnit.MILLISECONDS);
        if (responseAndHits == null || responseAndHits.response().timeout()) {
            return null;
        }
        return responseAndHits;
    }

    private void processResponses(Result result, String summaryClass) throws TimeoutException {
        try {
            ArrayList<FastHit> skippedHits = new ArrayList<FastHit>();
            while (this.outstandingResponses > 0) {
                ResponseAndHits responseAndHits = this.getNextResponse(result.getQuery().getTimeLeft());
                if (responseAndHits == null) {
                    this.throwTimeout();
                }
                skippedHits.addAll(this.processOneResponse(result, responseAndHits, summaryClass, false));
                --this.outstandingResponses;
            }
            if (skippedHits.isEmpty()) {
                return;
            }
            this.maybeRetry(skippedHits, result, summaryClass);
            if (!skippedHits.isEmpty()) {
                result.hits().addError(ErrorMessage.createEmptyDocsums("Missing hit summary data for summary " + summaryClass + " for " + String.valueOf(skippedHits) + " hits"));
            }
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    private List<FastHit> processOneResponse(Result result, ResponseAndHits responseAndHits, String summaryClass, boolean isRetry) {
        Client.ResponseOrError<Client.ProtobufResponse> responseOrError = responseAndHits.response();
        if (responseOrError.error().isPresent()) {
            if (this.hasReportedError || isRetry) {
                return List.of();
            }
        } else {
            Client.ProtobufResponse response = responseOrError.response().get();
            byte[] responseBytes = this.compressor.decompress(response);
            return this.fill(result, responseAndHits.hits(), summaryClass, responseBytes, isRetry);
        }
        String error = responseOrError.error().get();
        result.hits().addError(ErrorMessage.createBackendCommunicationError(error));
        log.log(Level.WARNING, "Error fetching summary data: " + error);
        this.hasReportedError = true;
        return List.of();
    }

    private void addErrors(Result result, Inspector errors) {
        errors.traverse((index, value) -> {
            int errorCode = "timeout".equalsIgnoreCase(value.field("type").asString()) ? Error.TIMEOUT.code : Error.UNSPECIFIED.code;
            result.hits().addError(new ErrorMessage(errorCode, value.field("message").asString(), value.field("details").asString()));
        });
    }

    private void convertErrorsFromDocsumReply(Result target, List<SearchProtocol.Error> errors) {
        for (SearchProtocol.Error error : errors) {
            target.hits().addError(ErrorMessage.createDocsumReplyError(error.getMessage()));
        }
    }

    private List<FastHit> fill(Result result, List<FastHit> hits, String summaryClass, byte[] payload, boolean isRetry) {
        try {
            SlimeAdapter summaries;
            Inspector root;
            SearchProtocol.DocsumReply protobuf = SearchProtocol.DocsumReply.parseFrom((byte[])payload);
            Object object = root = this.decodePolicy == DecodePolicy.ONDEMAND ? BinaryView.inspect((byte[])protobuf.getSlimeSummaries().toByteArray()) : BinaryFormat.decode((byte[])protobuf.getSlimeSummaries().toByteArray()).get();
            if (!isRetry) {
                boolean hasErrors;
                Inspector errors = root.field("errors");
                boolean bl = hasErrors = errors.valid() && errors.entries() > 0;
                if (hasErrors) {
                    this.addErrors(result, errors);
                }
                this.convertErrorsFromDocsumReply(result, protobuf.getErrorsList());
            }
            if (!(summaries = new SlimeAdapter(root.field("docsums"))).valid()) {
                return List.of();
            }
            ArrayList<FastHit> skippedHits = new ArrayList<FastHit>();
            for (int i = 0; i < hits.size(); ++i) {
                boolean needFill;
                com.yahoo.data.access.Inspector summary = summaries.entry(i).field("docsum");
                FastHit hit = hits.get(i);
                boolean bl = needFill = isRetry ? this.partialSummaryHandler.needFill(hit) : true;
                if (summary.valid() && needFill) {
                    hit.setField("sddocname", this.documentDb.schema().name());
                    hit.addSummary(this.partialSummaryHandler.effectiveDocsumDef(), summary);
                    this.partialSummaryHandler.markFilled(hit);
                    ++this.numOkFilledHits;
                    continue;
                }
                if (!needFill) continue;
                skippedHits.add(hit);
            }
            return skippedHits;
        }
        catch (InvalidProtocolBufferException ex) {
            if (!isRetry) {
                log.log(Level.WARNING, "Invalid response to docsum request", ex);
                result.hits().addError(ErrorMessage.createInternalServerError("Invalid response to docsum request from backend"));
            }
            return List.of();
        }
    }

    private void throwTimeout() throws TimeoutException {
        throw new TimeoutException("Timed out waiting for summary data. " + this.outstandingResponses + " responses outstanding.");
    }

    private void maybeRetry(List<FastHit> skippedHits, Result result, String summaryClass) throws InterruptedException {
        double retryLimitFactor;
        Query query;
        double absoluteRetryLimit;
        double retryLimit;
        int numSkipped = skippedHits.size();
        if ((double)numSkipped < (retryLimit = Math.min(absoluteRetryLimit = (double)(query = result.getQuery()).properties().getInteger(Dispatcher.docsumRetryLimit, 10).intValue(), (retryLimitFactor = query.properties().getDouble(Dispatcher.docsumRetryFactor, 0.5).doubleValue()) * (double)this.numHitsToFill + 1.0))) {
            result.getQuery().trace(false, 1, "Retry summary fetching for " + numSkipped + " empty docsums (of " + this.numHitsToFill + " hits)");
            ListMap retryMap = new ListMap();
            for (Integer nodeId : this.resourcePool.knownNodeIds()) {
                for (FastHit hit2 : skippedHits) {
                    if (hit2.getDistributionKey() == nodeId.intValue()) continue;
                    retryMap.put((Object)nodeId, (Object)hit2);
                }
            }
            if (retryMap.size() > 0) {
                if (RpcProtobufFillInvoker.shouldLogRetry()) {
                    log.log(Level.WARNING, "Retry docsum fetch for " + numSkipped + " hits (" + this.numOkFilledHits + " ok hits)");
                }
                this.summaryNeedsQuery = true;
                this.sendFillRequestByNode(result, summaryClass, (ListMap<Integer, FastHit>)retryMap);
                while (this.outstandingResponses > 0 && this.numOkFilledHits < this.numHitsToFill) {
                    ResponseAndHits responseAndHits = this.getNextResponse(query.getTimeLeft());
                    if (responseAndHits == null) {
                        if (!RpcProtobufFillInvoker.shouldLogRetryTimeout()) break;
                        log.log(Level.WARNING, "Timed out waiting for summary data. " + this.outstandingResponses + " responses outstanding.");
                        break;
                    }
                    this.processOneResponse(result, responseAndHits, summaryClass, true);
                    --this.outstandingResponses;
                }
                skippedHits.removeIf(hit -> !this.partialSummaryHandler.needFill((Hit)hit));
            }
        } else {
            result.getQuery().trace(false, 1, "Summary fetching got " + numSkipped + " empty docsums (of " + this.numHitsToFill + " hits), no retry");
            if (RpcProtobufFillInvoker.shouldLogNoRetry()) {
                log.log(Level.WARNING, "Docsum fetch failed for " + numSkipped + " hits (" + this.numOkFilledHits + " ok hits), no retry");
            }
        }
    }

    private static boolean shouldLogForCount(int count) {
        if (count < 100) {
            return true;
        }
        if (count < 1000) {
            return count % 100 == 0;
        }
        if (count < 100000) {
            return count % 1000 == 0;
        }
        return count % 10000 == 0;
    }

    private static boolean shouldLogRetry() {
        int count = retryCounter.getAndAdd(1);
        return RpcProtobufFillInvoker.shouldLogForCount(count);
    }

    private static boolean shouldLogNoRetry() {
        int count = noRetryCounter.getAndAdd(1);
        return RpcProtobufFillInvoker.shouldLogForCount(count);
    }

    private static boolean shouldLogRetryTimeout() {
        int count = retryTimeoutCounter.getAndAdd(1);
        return RpcProtobufFillInvoker.shouldLogForCount(count);
    }

    static enum DecodePolicy {
        EAGER,
        ONDEMAND;

    }

    private record ResponseAndHits(Client.ResponseOrError<Client.ProtobufResponse> response, List<FastHit> hits) {
    }
}

