/*
 * Decompiled with CFR 0.152.
 */
package com.yahoo.vespa.streamingvisitors;

import com.yahoo.container.core.documentapi.VespaDocumentAccess;
import com.yahoo.data.access.Inspector;
import com.yahoo.document.DocumentId;
import com.yahoo.document.select.parser.ParseException;
import com.yahoo.document.select.parser.TokenMgrException;
import com.yahoo.documentapi.VisitorParameters;
import com.yahoo.documentapi.VisitorSession;
import com.yahoo.fs4.DocsumPacket;
import com.yahoo.messagebus.routing.Route;
import com.yahoo.prelude.Ping;
import com.yahoo.prelude.Pong;
import com.yahoo.prelude.fastsearch.ClusterParams;
import com.yahoo.prelude.fastsearch.FastHit;
import com.yahoo.prelude.fastsearch.GroupingListHit;
import com.yahoo.prelude.fastsearch.TimeoutException;
import com.yahoo.prelude.fastsearch.VespaBackend;
import com.yahoo.processing.IllegalInputException;
import com.yahoo.processing.request.CompoundName;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
import com.yahoo.search.result.Coverage;
import com.yahoo.search.result.ErrorMessage;
import com.yahoo.search.result.FeatureData;
import com.yahoo.search.result.Relevance;
import com.yahoo.search.searchchain.Execution;
import com.yahoo.searchlib.aggregation.Grouping;
import com.yahoo.vdslib.DocumentSummary;
import com.yahoo.vdslib.SearchResult;
import com.yahoo.vdslib.VisitorStatistics;
import com.yahoo.vespa.streamingvisitors.StreamingVisitor;
import com.yahoo.vespa.streamingvisitors.TracingOptions;
import com.yahoo.vespa.streamingvisitors.Visitor;
import com.yahoo.vespa.streamingvisitors.VisitorFactory;
import com.yahoo.vespa.streamingvisitors.tracing.TraceDescription;
import com.yahoo.yolean.Exceptions;
import java.math.BigInteger;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

public class StreamingBackend
extends VespaBackend {
    private static final Logger log = Logger.getLogger(StreamingBackend.class.getName());
    private static final CompoundName streamingUserid = CompoundName.from((String)"streaming.userid");
    private static final CompoundName streamingGroupname = CompoundName.from((String)"streaming.groupname");
    private static final CompoundName streamingSelection = CompoundName.from((String)"streaming.selection");
    static final String STREAMING_STATISTICS = "streaming.statistics";
    private final VisitorFactory visitorFactory;
    private final TracingOptions tracingOptions;
    private final Route route;
    private final String searchClusterName;
    private final String storageClusterRouteSpec;

    StreamingBackend(ClusterParams clusterParams, String searchClusterName, VisitorFactory visitorFactory, String storageClusterRouteSpec) {
        this(clusterParams, searchClusterName, visitorFactory, storageClusterRouteSpec, TracingOptions.DEFAULT);
    }

    StreamingBackend(ClusterParams clusterParams, String searchClusterName, VisitorFactory visitorFactory, String storageClusterRouteSpec, TracingOptions tracingOptions) {
        super(clusterParams);
        this.visitorFactory = visitorFactory;
        this.tracingOptions = tracingOptions;
        this.searchClusterName = searchClusterName;
        this.storageClusterRouteSpec = storageClusterRouteSpec;
        this.route = Route.parse((String)storageClusterRouteSpec);
    }

    public StreamingBackend(ClusterParams clusterParams, String searchClusterName, VespaDocumentAccess access, String storageClusterRouteSpec) {
        this(clusterParams, searchClusterName, new VespaVisitorFactory(access), storageClusterRouteSpec);
    }

    private String getSearchClusterName() {
        return this.searchClusterName;
    }

    @Override
    protected void doPartialFill(Result result, String summaryClass) {
    }

    private double durationInMillisFromNanoTime(long startTimeNanos) {
        return (double)(this.tracingOptions.getClock().nanoTimeNow() - startTimeNanos) / (double)TimeUnit.MILLISECONDS.toNanos(1L);
    }

    private boolean timeoutBadEnoughToBeReported(Query query, double durationMillis) {
        return durationMillis > (double)query.getTimeout() * this.tracingOptions.getTraceTimeoutMultiplierThreshold();
    }

    private static boolean queryIsLocationConstrained(Query query) {
        return query.properties().getString(streamingUserid) != null || query.properties().getString(streamingGroupname) != null;
    }

    private static int documentSelectionQueryParameterCount(Query query) {
        int paramCount = 0;
        if (query.properties().getString(streamingUserid) != null) {
            ++paramCount;
        }
        if (query.properties().getString(streamingGroupname) != null) {
            ++paramCount;
        }
        if (query.properties().getString(streamingSelection) != null) {
            ++paramCount;
        }
        return paramCount;
    }

    private boolean shouldTraceQuery(Query query) {
        return StreamingBackend.queryIsLocationConstrained(query) && (query.getTrace().getLevel() > 0 || this.tracingOptions.getSamplingStrategy().shouldSample());
    }

    private int inferEffectiveQueryTraceLevel(Query query) {
        return query.getTrace().getLevel() == 0 && this.shouldTraceQuery(query) ? this.tracingOptions.getTraceLevelOverride() : query.getTrace().getLevel();
    }

    @Override
    public Result doSearch2(String schema, Query query) {
        if (query.getTimeLeft() <= 0L) {
            return new Result(query, ErrorMessage.createTimeout(String.format("No time left for searching (timeout=%d)", query.getTimeout())));
        }
        this.initializeMissingQueryFields(query);
        if (StreamingBackend.documentSelectionQueryParameterCount(query) != 1) {
            return new Result(query, ErrorMessage.createIllegalQuery("Streaming search requires either streaming.groupname or streaming.selection"));
        }
        try {
            this.ensureLegalSummaryClass(query, query.getPresentation().getSummary());
        }
        catch (IllegalInputException e) {
            return new Result(query, ErrorMessage.createIllegalQuery(Exceptions.toMessageString((Throwable)e)));
        }
        if (query.getTrace().isTraceable(4)) {
            query.trace("Routing to search cluster " + this.getSearchClusterName() + " and document type " + schema, 4);
        }
        long timeStartedNanos = this.tracingOptions.getClock().nanoTimeNow();
        int effectiveTraceLevel = this.inferEffectiveQueryTraceLevel(query);
        Visitor visitor = this.visitorFactory.createVisitor(query, this.getSearchClusterName(), this.route, schema, effectiveTraceLevel);
        try {
            visitor.doSearch();
        }
        catch (ParseException e) {
            return new Result(query, ErrorMessage.createInvalidQueryParameter("Failed to parse document selection string: " + e.getMessage()));
        }
        catch (TokenMgrException e) {
            return new Result(query, ErrorMessage.createInvalidQueryParameter("Failed to tokenize document selection string: " + e.getMessage()));
        }
        catch (TimeoutException e) {
            double elapsedMillis = this.durationInMillisFromNanoTime(timeStartedNanos);
            if (effectiveTraceLevel > 0 && this.timeoutBadEnoughToBeReported(query, elapsedMillis)) {
                this.tracingOptions.getTraceExporter().maybeExport(() -> new TraceDescription(visitor.getTrace(), String.format("Trace of %s which timed out after %.3g seconds", query, elapsedMillis / 1000.0)));
            }
            return new Result(query, ErrorMessage.createTimeout(e.getMessage()));
        }
        catch (InterruptedException e) {
            return new Result(query, ErrorMessage.createBackendCommunicationError(e.getMessage()));
        }
        return this.buildResultFromCompletedVisitor(query, visitor);
    }

    private void initializeMissingQueryFields(Query query) {
        StreamingBackend.lazyTrace(query, 7, "Routing to storage cluster ", this.storageClusterRouteSpec);
        StreamingBackend.lazyTrace(query, 8, "Route is ", this.route);
        StreamingBackend.lazyTrace(query, 7, "doSearch2(): query docsum class=", query.getPresentation().getSummary(), ", default docsum class=", this.getDefaultDocsumClass());
        if (query.getPresentation().getSummary() == null) {
            StreamingBackend.lazyTrace(query, 6, "doSearch2(): No summary class specified in query, using default: ", this.getDefaultDocsumClass());
            query.getPresentation().setSummary(this.getDefaultDocsumClass());
        } else {
            StreamingBackend.lazyTrace(query, 6, "doSearch2(): Summary class has been specified in query: ", query.getPresentation().getSummary());
        }
        StreamingBackend.lazyTrace(query, 8, "doSearch2(): rank properties=", query.getRanking());
        StreamingBackend.lazyTrace(query, 8, "doSearch2(): sort specification=", query.getRanking().getSorting() == null ? null : query.getRanking().getSorting().fieldOrders());
    }

    private Result buildResultFromCompletedVisitor(Query query, Visitor visitor) {
        StreamingBackend.lazyTrace(query, 8, "offset=", query.getOffset(), ", hits=", query.getHits());
        Result result = new Result(query);
        List<SearchResult.Hit> hits = visitor.getHits();
        Map<String, DocumentSummary.Summary> summaryMap = visitor.getSummaryMap();
        StreamingBackend.lazyTrace(query, 7, "total hit count = ", visitor.getTotalHitCount(), ", returned hit count = ", hits.size(), ", summary count = ", summaryMap.size());
        VisitorStatistics stats = visitor.getStatistics();
        result.setTotalHitCount(visitor.getTotalHitCount());
        result.setCoverage(new Coverage(stats.getDocumentsVisited(), stats.getDocumentsVisited(), 1, 1));
        query.trace(visitor.getStatistics().toString(), false, 2);
        query.getContext(true).setProperty(STREAMING_STATISTICS, stats);
        DocsumPacket[] summaryPackets = new DocsumPacket[hits.size()];
        int index = 0;
        boolean skippedEarlierResult = false;
        for (SearchResult.Hit hit : hits) {
            DocsumPacket dp;
            if (!StreamingBackend.verifyDocId(hit.getDocId(), query, skippedEarlierResult)) {
                skippedEarlierResult = true;
                continue;
            }
            FastHit fastHit = this.buildSummaryHit(query, hit);
            result.hits().add(fastHit);
            DocumentSummary.Summary summary = summaryMap.get(hit.getDocId());
            if (summary == null) {
                return new Result(query, ErrorMessage.createBackendCommunicationError("Did not find summary for hit with document id " + hit.getDocId()));
            }
            summaryPackets[index] = dp = new DocsumPacket(summary.getSummary());
            ++index;
        }
        if (result.isFilled(query.getPresentation().getSummary())) {
            StreamingBackend.lazyTrace(query, 8, "Result is filled for summary class ", query.getPresentation().getSummary());
        } else {
            StreamingBackend.lazyTrace(query, 8, "Result is not filled for summary class ", query.getPresentation().getSummary());
        }
        List<Grouping> groupingList = visitor.getGroupings();
        StreamingBackend.lazyTrace(query, 8, "Grouping list=", groupingList);
        if (!groupingList.isEmpty()) {
            GroupingListHit groupHit = new GroupingListHit(groupingList, this.getDocumentDatabase(query), query);
            result.hits().add(groupHit);
        }
        VespaBackend.FillHitsResult fillHitsResult = this.fillHits(result, summaryPackets, query.getPresentation().getSummary());
        int skippedHits = fillHitsResult.skippedHits;
        if (fillHitsResult.error != null) {
            result.hits().addError(ErrorMessage.createTimeout(fillHitsResult.error));
            return result;
        }
        if (skippedHits == 0) {
            query.trace("All hits have been filled", 4);
        } else {
            StreamingBackend.lazyTrace(query, 8, "Skipping some hits for query: ", result.getQuery());
        }
        StreamingBackend.lazyTrace(query, 8, "Returning result ", result);
        if (skippedHits > 0) {
            log.info("skipping " + skippedHits + " hits for query: " + result.getQuery());
            result.hits().addError(ErrorMessage.createTimeout("Missing hit summary data for " + skippedHits + " hits"));
        }
        Set<String> errors = visitor.getErrors();
        for (String error : errors) {
            result.hits().addError(ErrorMessage.createSearchReplyError(error));
        }
        return result;
    }

    private FastHit buildSummaryHit(Query query, SearchResult.Hit hit) {
        FastHit fastHit = new FastHit();
        fastHit.setQuery(query);
        fastHit.setSource(this.getName());
        fastHit.setId(hit.getDocId());
        fastHit.setRelevance(new Relevance(hit.getRank()));
        if (hit instanceof SearchResult.HitWithSortBlob) {
            SearchResult.HitWithSortBlob sortedHit = (SearchResult.HitWithSortBlob)hit;
            fastHit.setSortData(sortedHit.getSortBlob(), query.getRanking().getSorting());
        }
        if (hit.getMatchFeatures().isPresent()) {
            fastHit.setField("matchfeatures", new FeatureData((Inspector)hit.getMatchFeatures().get()));
        }
        fastHit.setFillable();
        return fastHit;
    }

    private static void lazyTrace(Query query, int level, Object ... args) {
        if (query.getTrace().isTraceable(level)) {
            StringBuilder s = new StringBuilder();
            for (Object arg : args) {
                s.append(arg);
            }
            query.trace(s.toString(), level);
        }
    }

    static boolean verifyDocId(String id, Query query, boolean skippedEarlierResult) {
        DocumentId docId;
        String expectedUserId = query.properties().getString(streamingUserid);
        String expectedGroupName = query.properties().getString(streamingGroupname);
        Level logLevel = Level.SEVERE;
        if (skippedEarlierResult) {
            logLevel = Level.FINE;
        }
        try {
            docId = new DocumentId(id);
        }
        catch (IllegalArgumentException iae) {
            log.log(logLevel, "Bad result for " + query + ": " + iae.getMessage());
            return false;
        }
        if (expectedUserId != null) {
            if (!docId.getScheme().hasNumber()) {
                log.log(logLevel, "Got result with wrong scheme  in document ID (" + id + ") for " + query);
                return false;
            }
            long userId = docId.getScheme().getNumber();
            if (new BigInteger(expectedUserId).longValue() != userId) {
                log.log(logLevel, "Got result with wrong user ID (expected " + expectedUserId + ") in document ID (" + id + ") for " + query);
                return false;
            }
        } else if (expectedGroupName != null) {
            if (!docId.getScheme().hasGroup()) {
                log.log(logLevel, "Got result with wrong scheme  in document ID (" + id + ") for " + query);
                return false;
            }
            String groupName = docId.getScheme().getGroup();
            if (!expectedGroupName.equals(groupName)) {
                log.log(logLevel, "Got result with wrong group name (expected " + expectedGroupName + ") in document ID (" + id + ") for " + query);
                return false;
            }
        }
        return true;
    }

    public Pong ping(Ping ping, Execution execution) {
        return new Pong();
    }

    private static class VespaVisitorFactory
    implements StreamingVisitor.VisitorSessionFactory,
    VisitorFactory {
        private final VespaDocumentAccess access;

        private VespaVisitorFactory(VespaDocumentAccess access) {
            this.access = access;
        }

        @Override
        public VisitorSession createVisitorSession(VisitorParameters params) throws ParseException {
            return this.access.createVisitorSession(params);
        }

        @Override
        public Visitor createVisitor(Query query, String searchCluster, Route route, String schema, int traceLevelOverride) {
            return new StreamingVisitor(query, searchCluster, route, schema, this, traceLevelOverride);
        }
    }
}

