package com.yahoo.vespa.http.server;

import com.yahoo.container.jdisc.HttpRequest;
import com.yahoo.container.jdisc.HttpResponse;
import com.yahoo.document.DocumentTypeManager;
import com.yahoo.jdisc.Metric;
import com.yahoo.jdisc.ReferencedResource;
import com.yahoo.jdisc.ResourceReference;
import com.yahoo.messagebus.Error;
import com.yahoo.messagebus.Message;
import com.yahoo.messagebus.ReplyHandler;
import com.yahoo.messagebus.Result;
import com.yahoo.messagebus.shared.SharedSourceSession;
import com.yahoo.net.HostName;
import com.yahoo.vespaxmlparser.FeedOperation;
import com.yahoo.yolean.Exceptions;
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:com/yahoo/vespa/http/server/ClientFeederV3.class */
class ClientFeederV3 {
    protected static final Logger log = Logger.getLogger(ClientFeederV3.class.getName());
    private static final AtomicInteger outstandingOperations = new AtomicInteger(0);
    private final ReferencedResource<SharedSourceSession> sourceSession;
    private final String clientId;
    private final ReplyHandler feedReplyHandler;
    private final Metric metric;
    private final StreamReaderV3 streamReaderV3;
    private final BlockingQueue<OperationStatus> feedReplies = new LinkedBlockingQueue();
    private Instant prevOpsPerSecTime = Instant.now();
    private double operationsForOpsPerSec = 0.0d;
    private final Object monitor = new Object();
    private final AtomicInteger ongoingRequests = new AtomicInteger(0);
    private final String hostName = HostName.getLocalhost();

    /* JADX INFO: Access modifiers changed from: package-private */
    public ClientFeederV3(ReferencedResource<SharedSourceSession> referencedResource, FeedReaderFactory feedReaderFactory, DocumentTypeManager documentTypeManager, String str, Metric metric, ReplyHandler replyHandler) {
        this.sourceSession = referencedResource;
        this.clientId = str;
        this.feedReplyHandler = replyHandler;
        this.metric = metric;
        this.streamReaderV3 = new StreamReaderV3(feedReaderFactory, documentTypeManager);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean timedOut() {
        boolean z;
        synchronized (this.monitor) {
            z = Instant.now().isAfter(this.prevOpsPerSecTime.plusSeconds(6000L)) && this.ongoingRequests.get() == 0;
        }
        return z;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void kill() {
        try {
            ResourceReference reference = this.sourceSession.getReference();
            while (this.ongoingRequests.get() > 0) {
                try {
                    try {
                        Thread.sleep(100L);
                    } catch (InterruptedException e) {
                        if (reference != null) {
                            reference.close();
                            return;
                        }
                        return;
                    }
                } finally {
                }
            }
            if (reference != null) {
                reference.close();
            }
        } catch (Exception e2) {
            log.log(Level.WARNING, "Failed to close reference to source session", (Throwable) e2);
        }
    }

    private void transferPreviousRepliesToResponse(BlockingQueue<OperationStatus> blockingQueue) throws InterruptedException {
        OperationStatus poll = this.feedReplies.poll();
        while (true) {
            OperationStatus operationStatus = poll;
            if (operationStatus == null) {
                return;
            }
            outstandingOperations.decrementAndGet();
            blockingQueue.put(operationStatus);
            poll = this.feedReplies.poll();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Finally extract failed */
    public HttpResponse handleRequest(HttpRequest httpRequest) throws IOException {
        this.ongoingRequests.incrementAndGet();
        try {
            FeederSettings feederSettings = new FeederSettings(httpRequest);
            InputStream unzipStreamIfNeeded = StreamReaderV3.unzipStreamIfNeeded(httpRequest);
            LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
            try {
                try {
                    try {
                        feed(feederSettings, unzipStreamIfNeeded, linkedBlockingQueue);
                        synchronized (this.monitor) {
                            if (httpRequest.getJDiscRequest().headers().get(Headers.DATA_FORMAT) != null) {
                                transferPreviousRepliesToResponse(linkedBlockingQueue);
                            }
                        }
                        linkedBlockingQueue.add(createOperationStatus("-", "-", ErrorCode.END_OF_FEED, null));
                    } catch (Throwable th) {
                        linkedBlockingQueue.add(createOperationStatus("-", "-", ErrorCode.END_OF_FEED, null));
                        throw th;
                    }
                } catch (Throwable th2) {
                    log.log(Level.WARNING, "Unhandled exception while feeding: " + Exceptions.toMessageString(th2), th2);
                    linkedBlockingQueue.add(createOperationStatus("-", "-", ErrorCode.END_OF_FEED, null));
                }
            } catch (InterruptedException e) {
                log.log(Level.FINE, e, () -> {
                    return "Feed handler was interrupted: " + e.getMessage();
                });
                linkedBlockingQueue.add(createOperationStatus("-", "-", ErrorCode.END_OF_FEED, null));
            }
            FeedResponse feedResponse = new FeedResponse(200, linkedBlockingQueue, 3, this.clientId, outstandingOperations.get(), this.hostName);
            this.ongoingRequests.decrementAndGet();
            return feedResponse;
        } catch (Throwable th3) {
            this.ongoingRequests.decrementAndGet();
            throw th3;
        }
    }

    private Optional<DocumentOperationMessageV3> pullMessageFromRequest(FeederSettings feederSettings, InputStream inputStream, BlockingQueue<OperationStatus> blockingQueue) {
        while (true) {
            try {
                Optional<String> nextOperationId = this.streamReaderV3.getNextOperationId(inputStream);
                if (nextOperationId.isEmpty()) {
                    return Optional.empty();
                }
                try {
                    DocumentOperationMessageV3 nextMessage = getNextMessage(nextOperationId.get(), inputStream, feederSettings);
                    if (nextMessage != null) {
                        setRoute(nextMessage, feederSettings);
                    }
                    return Optional.ofNullable(nextMessage);
                } catch (Exception e) {
                    log.log(Level.WARNING, () -> {
                        return Exceptions.toMessageString(e);
                    });
                    this.metric.add(MetricNames.PARSE_ERROR, 1, (Metric.Context) null);
                    blockingQueue.add(new OperationStatus(Exceptions.toMessageString(e), nextOperationId.get(), ErrorCode.ERROR, false, ""));
                }
            } catch (IOException e2) {
                log.log(Level.FINE, () -> {
                    return Exceptions.toMessageString(e2);
                });
                return Optional.empty();
            }
        }
    }

    private Result sendMessage(DocumentOperationMessageV3 documentOperationMessageV3) throws InterruptedException {
        documentOperationMessageV3.getMessage().pushHandler(this.feedReplyHandler);
        return this.sourceSession.getResource().sendMessageBlocking(documentOperationMessageV3.getMessage());
    }

    private void feed(FeederSettings feederSettings, InputStream inputStream, BlockingQueue<OperationStatus> blockingQueue) throws InterruptedException {
        while (true) {
            Optional<DocumentOperationMessageV3> pullMessageFromRequest = pullMessageFromRequest(feederSettings, inputStream, blockingQueue);
            if (pullMessageFromRequest.isEmpty()) {
                return;
            }
            setMessageParameters(pullMessageFromRequest.get(), feederSettings);
            try {
                Result sendMessage = sendMessage(pullMessageFromRequest.get());
                if (sendMessage.isAccepted()) {
                    outstandingOperations.incrementAndGet();
                    updateOpsPerSec();
                    log(Level.FINE, "Sent message successfully, document id: ", pullMessageFromRequest.get().getOperationId());
                } else {
                    Error error = sendMessage.getError();
                    DocumentOperationMessageV3 documentOperationMessageV3 = pullMessageFromRequest.get();
                    blockingQueue.add(createOperationStatus(documentOperationMessageV3.getOperationId(), error.getMessage(), ErrorCode.fromBusError(error), documentOperationMessageV3.getMessage()));
                }
            } catch (RuntimeException e) {
                blockingQueue.add(createOperationStatus(pullMessageFromRequest.get().getOperationId(), Exceptions.toMessageString(e), ErrorCode.ERROR, pullMessageFromRequest.get().getMessage()));
            }
        }
    }

    private OperationStatus createOperationStatus(String str, String str2, ErrorCode errorCode, Message message) {
        return new OperationStatus(str2, str, errorCode, false, (message == null || message.getTrace() == null || message.getTrace().getLevel() <= 0) ? "" : message.getTrace().toString());
    }

    protected DocumentOperationMessageV3 getNextMessage(String str, InputStream inputStream, FeederSettings feederSettings) throws Exception {
        FeedOperation nextOperation = this.streamReaderV3.getNextOperation(inputStream, feederSettings);
        if (this.sourceSession.getResource().session() != null) {
            this.metric.set(MetricNames.PENDING, Double.valueOf(this.sourceSession.getResource().session().getPendingCount()), (Metric.Context) null);
        }
        DocumentOperationMessageV3 create = DocumentOperationMessageV3.create(nextOperation, str, this.metric);
        if (create == null) {
            return null;
        }
        this.metric.add(MetricNames.NUM_OPERATIONS, 1, (Metric.Context) null);
        log(Level.FINE, "Successfully deserialized document id: ", create.getOperationId());
        return create;
    }

    private void setMessageParameters(DocumentOperationMessageV3 documentOperationMessageV3, FeederSettings feederSettings) {
        documentOperationMessageV3.getMessage().setContext(new ReplyContext(documentOperationMessageV3.getOperationId(), this.feedReplies));
        if (feederSettings.traceLevel != null) {
            documentOperationMessageV3.getMessage().getTrace().setLevel(feederSettings.traceLevel.intValue());
        }
    }

    private void setRoute(DocumentOperationMessageV3 documentOperationMessageV3, FeederSettings feederSettings) {
        if (feederSettings.route != null) {
            documentOperationMessageV3.getMessage().setRoute(feederSettings.route);
        }
    }

    protected final void log(Level level, Object... objArr) {
        if (log.isLoggable(level)) {
            StringBuilder sb = new StringBuilder();
            for (Object obj : objArr) {
                sb.append(obj.toString());
            }
            log.log(level, sb.toString());
        }
    }

    private void updateOpsPerSec() {
        Instant now = Instant.now();
        synchronized (this.monitor) {
            if (now.plusSeconds(1L).isAfter(this.prevOpsPerSecTime)) {
                this.metric.set(MetricNames.OPERATIONS_PER_SEC, Double.valueOf(this.operationsForOpsPerSec / (Duration.between(now, this.prevOpsPerSecTime).toMillis() / 1000.0d)), (Metric.Context) null);
                this.operationsForOpsPerSec = 1.0d;
                this.prevOpsPerSecTime = now;
            } else {
                this.operationsForOpsPerSec += 1.0d;
            }
        }
    }
}
