package com.yahoo.search.dispatch.rpc;

import ai.vespa.searchlib.searchprotocol.protobuf.SearchProtocol;
import com.google.protobuf.InvalidProtocolBufferException;
import com.yahoo.collections.ListMap;
import com.yahoo.compress.Compressor;
import com.yahoo.container.protect.Error;
import com.yahoo.data.access.slime.SlimeAdapter;
import com.yahoo.prelude.fastsearch.DocumentDatabase;
import com.yahoo.prelude.fastsearch.FastHit;
import com.yahoo.prelude.fastsearch.PartialSummaryHandler;
import com.yahoo.prelude.fastsearch.TimeoutException;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
import com.yahoo.search.dispatch.Dispatcher;
import com.yahoo.search.dispatch.FillInvoker;
import com.yahoo.search.dispatch.rpc.Client;
import com.yahoo.search.dispatch.rpc.TimeoutHelper;
import com.yahoo.search.query.Model;
import com.yahoo.search.result.ErrorMessage;
import com.yahoo.search.result.Hit;
import com.yahoo.search.result.HitGroup;
import com.yahoo.slime.BinaryFormat;
import com.yahoo.slime.BinaryView;
import com.yahoo.slime.Inspector;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.class */
public class RpcProtobufFillInvoker extends FillInvoker {
    private static final String RPC_METHOD = "vespa.searchprotocol.getDocsums";
    private final DocumentDatabase documentDb;
    private final RpcConnectionPool resourcePool;
    private boolean summaryNeedsQuery;
    private final PartialSummaryHandler partialSummaryHandler;
    private final String serverId;
    private final CompressPayload compressor;
    private final DecodePolicy decodePolicy;
    private BlockingQueue<ResponseAndHits> responses;
    private int outstandingResponses;
    private static final Logger log = Logger.getLogger(RpcProtobufFillInvoker.class.getName());
    private static final AtomicInteger retryCounter = new AtomicInteger();
    private static final AtomicInteger noRetryCounter = new AtomicInteger();
    private static final AtomicInteger retryTimeoutCounter = new AtomicInteger();
    private boolean hasReportedError = false;
    private int numOkFilledHits = 0;
    private int numHitsToFill = 0;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker$DecodePolicy.class */
    public enum DecodePolicy {
        EAGER,
        ONDEMAND
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker$ResponseAndHits.class */
    public static final class ResponseAndHits extends Record {
        private final Client.ResponseOrError<Client.ProtobufResponse> response;
        private final List<FastHit> hits;

        private ResponseAndHits(Client.ResponseOrError<Client.ProtobufResponse> responseOrError, List<FastHit> list) {
            this.response = responseOrError;
            this.hits = list;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, ResponseAndHits.class), ResponseAndHits.class, "response;hits", "FIELD:Lcom/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker$ResponseAndHits;->response:Lcom/yahoo/search/dispatch/rpc/Client$ResponseOrError;", "FIELD:Lcom/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker$ResponseAndHits;->hits:Ljava/util/List;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, ResponseAndHits.class), ResponseAndHits.class, "response;hits", "FIELD:Lcom/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker$ResponseAndHits;->response:Lcom/yahoo/search/dispatch/rpc/Client$ResponseOrError;", "FIELD:Lcom/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker$ResponseAndHits;->hits:Ljava/util/List;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, ResponseAndHits.class, Object.class), ResponseAndHits.class, "response;hits", "FIELD:Lcom/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker$ResponseAndHits;->response:Lcom/yahoo/search/dispatch/rpc/Client$ResponseOrError;", "FIELD:Lcom/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker$ResponseAndHits;->hits:Ljava/util/List;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public Client.ResponseOrError<Client.ProtobufResponse> response() {
            return this.response;
        }

        public List<FastHit> hits() {
            return this.hits;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RpcProtobufFillInvoker(RpcConnectionPool rpcConnectionPool, CompressPayload compressPayload, DocumentDatabase documentDatabase, String str, DecodePolicy decodePolicy, boolean z) {
        this.documentDb = documentDatabase;
        this.resourcePool = rpcConnectionPool;
        this.serverId = str;
        this.summaryNeedsQuery = z;
        this.compressor = compressPayload;
        this.decodePolicy = decodePolicy;
        this.partialSummaryHandler = new PartialSummaryHandler(documentDatabase);
    }

    @Override // com.yahoo.search.dispatch.FillInvoker
    protected void sendFillRequest(Result result, String str) {
        this.partialSummaryHandler.wantToFill(result, str);
        if (this.partialSummaryHandler.resultAlreadyFilled()) {
            result.getQuery().trace(false, 3, "Skipping fill of ", str, " as result is already filled");
            return;
        }
        ListMap<Integer, FastHit> hitsByNode = hitsByNode(result);
        this.responses = new LinkedBlockingQueue(Math.max(hitsByNode.size(), this.resourcePool.knownNodeIds().size()));
        sendFillRequestByNode(result, str, hitsByNode);
    }

    void sendFillRequestByNode(Result result, String str, ListMap<Integer, FastHit> listMap) {
        result.getQuery().trace(false, 5, "Sending ", Integer.valueOf(listMap.size()), " summary fetch requests with jrt/protobuf");
        this.outstandingResponses = listMap.size();
        TimeoutHelper.Timeout calculateTimeout = TimeoutHelper.calculateTimeout(result.getQuery());
        if (calculateTimeout.timedOut()) {
            listMap.forEach((num, list) -> {
                receive(Client.ResponseOrError.fromTimeoutError("Timed out prior to sending docsum request to " + num), list);
            });
        } else {
            SearchProtocol.DocsumRequest.Builder createDocsumRequestBuilder = ProtobufSerialization.createDocsumRequestBuilder(result.getQuery(), this.serverId, this.partialSummaryHandler.askForSummary(), this.partialSummaryHandler.askForFields(), this.summaryNeedsQuery, calculateTimeout.request());
            listMap.forEach((num2, list2) -> {
                sendDocsumsRequest(num2.intValue(), list2, ProtobufSerialization.serializeDocsumRequest(createDocsumRequestBuilder, list2), result, calculateTimeout.client());
            });
        }
    }

    @Override // com.yahoo.search.dispatch.FillInvoker
    protected void getFillResults(Result result, String str) {
        if (this.partialSummaryHandler.resultAlreadyFilled()) {
            if (this.outstandingResponses == 0) {
                HitGroup hits = result.hits();
                Objects.requireNonNull(hits);
                Iterable<Hit> iterable = hits::unorderedDeepIterator;
                for (Hit hit : iterable) {
                    if (hit.isFillable() && (hit instanceof FastHit)) {
                        this.partialSummaryHandler.markFilled(hit);
                    }
                }
                return;
            }
            log.log(Level.SEVERE, "result was already filled, but there are " + this.outstandingResponses + " outstanding responses");
        }
        try {
            processResponses(result, str);
            result.hits().setSorted(false);
            result.analyzeHits();
        } catch (TimeoutException e) {
            result.hits().addError(ErrorMessage.createTimeout("Summary data is incomplete: " + e.getMessage()));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.yahoo.search.dispatch.CloseableInvoker
    public void release() {
    }

    public void receive(Client.ResponseOrError<Client.ProtobufResponse> responseOrError, List<FastHit> list) {
        this.responses.add(new ResponseAndHits(responseOrError, list));
    }

    private final ListMap<Integer, FastHit> hitsByNode(Result result) {
        ListMap<Integer, FastHit> listMap = new ListMap<>();
        HitGroup hits = result.hits();
        Objects.requireNonNull(hits);
        Iterable<Hit> iterable = hits::unorderedDeepIterator;
        for (Hit hit : iterable) {
            if (hit instanceof FastHit) {
                FastHit fastHit = (FastHit) hit;
                this.numHitsToFill++;
                listMap.put(Integer.valueOf(fastHit.getDistributionKey()), fastHit);
            }
        }
        return listMap;
    }

    private void sendDocsumsRequest(int i, List<FastHit> list, byte[] bArr, Result result, double d) {
        Client.NodeConnection connection = this.resourcePool.getConnection(i);
        if (connection != null) {
            Compressor.Compression compress = this.compressor.compress(result.getQuery(), bArr);
            connection.request(RPC_METHOD, compress.type(), bArr.length, compress.data(), responseOrError -> {
                receive(responseOrError, list);
            }, d);
        } else {
            String str = "Could not fill hits from unknown node " + i;
            receive(Client.ResponseOrError.fromError(str), list);
            result.hits().addError(ErrorMessage.createEmptyDocsums(str));
            log.warning("Got hits with node id " + i + ", which is not included in the current dispatch config");
        }
    }

    private ResponseAndHits getNextResponse(long j) throws InterruptedException {
        ResponseAndHits poll;
        if (j <= 0 || (poll = this.responses.poll(j, TimeUnit.MILLISECONDS)) == null || poll.response().timeout()) {
            return null;
        }
        return poll;
    }

    private void processResponses(Result result, String str) throws TimeoutException {
        try {
            ArrayList arrayList = new ArrayList();
            while (this.outstandingResponses > 0) {
                ResponseAndHits nextResponse = getNextResponse(result.getQuery().getTimeLeft());
                if (nextResponse == null) {
                    throwTimeout();
                }
                arrayList.addAll(processOneResponse(result, nextResponse, str, false));
                this.outstandingResponses--;
            }
            if (arrayList.isEmpty()) {
                return;
            }
            maybeRetry(arrayList, result, str);
            if (!arrayList.isEmpty()) {
                result.hits().addError(ErrorMessage.createEmptyDocsums("Missing hit summary data for summary " + str + " for " + String.valueOf(arrayList) + " hits"));
            }
        } catch (InterruptedException e) {
        }
    }

    private List<FastHit> processOneResponse(Result result, ResponseAndHits responseAndHits, String str, boolean z) {
        Client.ResponseOrError<Client.ProtobufResponse> response = responseAndHits.response();
        if (!response.error().isPresent()) {
            return fill(result, responseAndHits.hits(), str, this.compressor.decompress(response.response().get()), z);
        }
        if (this.hasReportedError || z) {
            return List.of();
        }
        String str2 = response.error().get();
        result.hits().addError(ErrorMessage.createBackendCommunicationError(str2));
        log.log(Level.WARNING, "Error fetching summary data: " + str2);
        this.hasReportedError = true;
        return List.of();
    }

    private void addErrors(Result result, Inspector inspector) {
        inspector.traverse((i, inspector2) -> {
            result.hits().addError(new ErrorMessage("timeout".equalsIgnoreCase(inspector2.field(Model.TYPE).asString()) ? Error.TIMEOUT.code : Error.UNSPECIFIED.code, inspector2.field("message").asString(), inspector2.field("details").asString()));
        });
    }

    private void convertErrorsFromDocsumReply(Result result, List<SearchProtocol.Error> list) {
        Iterator<SearchProtocol.Error> it = list.iterator();
        while (it.hasNext()) {
            result.hits().addError(ErrorMessage.createDocsumReplyError(it.next().getMessage()));
        }
    }

    private List<FastHit> fill(Result result, List<FastHit> list, String str, byte[] bArr, boolean z) {
        try {
            SearchProtocol.DocsumReply parseFrom = SearchProtocol.DocsumReply.parseFrom(bArr);
            Inspector inspect = this.decodePolicy == DecodePolicy.ONDEMAND ? BinaryView.inspect(parseFrom.getSlimeSummaries().toByteArray()) : BinaryFormat.decode(parseFrom.getSlimeSummaries().toByteArray()).get();
            if (!z) {
                Inspector field = inspect.field("errors");
                if (field.valid() && field.entries() > 0) {
                    addErrors(result, field);
                }
                convertErrorsFromDocsumReply(result, parseFrom.getErrorsList());
            }
            SlimeAdapter slimeAdapter = new SlimeAdapter(inspect.field("docsums"));
            if (!slimeAdapter.valid()) {
                return List.of();
            }
            ArrayList arrayList = new ArrayList();
            for (int i = 0; i < list.size(); i++) {
                com.yahoo.data.access.Inspector field2 = slimeAdapter.entry(i).field("docsum");
                FastHit fastHit = list.get(i);
                boolean needFill = z ? this.partialSummaryHandler.needFill(fastHit) : true;
                if (field2.valid() && needFill) {
                    fastHit.setField(Hit.SDDOCNAME_FIELD, this.documentDb.schema().name());
                    fastHit.addSummary(this.partialSummaryHandler.effectiveDocsumDef(), field2);
                    this.partialSummaryHandler.markFilled(fastHit);
                    this.numOkFilledHits++;
                } else if (needFill) {
                    arrayList.add(fastHit);
                }
            }
            return arrayList;
        } catch (InvalidProtocolBufferException e) {
            if (!z) {
                log.log(Level.WARNING, "Invalid response to docsum request", e);
                result.hits().addError(ErrorMessage.createInternalServerError("Invalid response to docsum request from backend"));
            }
            return List.of();
        }
    }

    private void throwTimeout() throws TimeoutException {
        throw new TimeoutException("Timed out waiting for summary data. " + this.outstandingResponses + " responses outstanding.");
    }

    private void maybeRetry(List<FastHit> list, Result result, String str) throws InterruptedException {
        int size = list.size();
        Query query = result.getQuery();
        if (size >= Math.min(query.m62properties().getInteger(Dispatcher.docsumRetryLimit, 10).intValue(), (query.m62properties().getDouble(Dispatcher.docsumRetryFactor, Double.valueOf(0.5d)).doubleValue() * this.numHitsToFill) + 1.0d)) {
            result.getQuery().trace(false, 1, "Summary fetching got " + size + " empty docsums (of " + this.numHitsToFill + " hits), no retry");
            if (shouldLogNoRetry()) {
                log.log(Level.WARNING, "Docsum fetch failed for " + size + " hits (" + this.numOkFilledHits + " ok hits), no retry");
                return;
            }
            return;
        }
        result.getQuery().trace(false, 1, "Retry summary fetching for " + size + " empty docsums (of " + this.numHitsToFill + " hits)");
        ListMap<Integer, FastHit> listMap = new ListMap<>();
        for (Integer num : this.resourcePool.knownNodeIds()) {
            for (FastHit fastHit : list) {
                if (fastHit.getDistributionKey() != num.intValue()) {
                    listMap.put(num, fastHit);
                }
            }
        }
        if (listMap.size() > 0) {
            if (shouldLogRetry()) {
                log.log(Level.WARNING, "Retry docsum fetch for " + size + " hits (" + this.numOkFilledHits + " ok hits)");
            }
            this.summaryNeedsQuery = true;
            sendFillRequestByNode(result, str, listMap);
            while (true) {
                if (this.outstandingResponses <= 0 || this.numOkFilledHits >= this.numHitsToFill) {
                    break;
                }
                ResponseAndHits nextResponse = getNextResponse(query.getTimeLeft());
                if (nextResponse != null) {
                    processOneResponse(result, nextResponse, str, true);
                    this.outstandingResponses--;
                } else if (shouldLogRetryTimeout()) {
                    log.log(Level.WARNING, "Timed out waiting for summary data. " + this.outstandingResponses + " responses outstanding.");
                }
            }
            list.removeIf(fastHit2 -> {
                return !this.partialSummaryHandler.needFill(fastHit2);
            });
        }
    }

    private static boolean shouldLogForCount(int i) {
        if (i < 100) {
            return true;
        }
        return i < 1000 ? i % 100 == 0 : i < 100000 ? i % 1000 == 0 : i % 10000 == 0;
    }

    private static boolean shouldLogRetry() {
        return shouldLogForCount(retryCounter.getAndAdd(1));
    }

    private static boolean shouldLogNoRetry() {
        return shouldLogForCount(noRetryCounter.getAndAdd(1));
    }

    private static boolean shouldLogRetryTimeout() {
        return shouldLogForCount(retryTimeoutCounter.getAndAdd(1));
    }
}
