package org.apache.pinot.query.mailbox;

import com.google.protobuf.ByteString;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.datablock.DataBlockUtils;
import org.apache.pinot.common.datatable.StatMap;
import org.apache.pinot.common.proto.Mailbox;
import org.apache.pinot.common.proto.PinotMailboxGrpc;
import org.apache.pinot.query.mailbox.channel.ChannelManager;
import org.apache.pinot.query.mailbox.channel.MailboxStatusObserver;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/query/mailbox/GrpcSendingMailbox.class */
public class GrpcSendingMailbox implements SendingMailbox {
    private static final Logger LOGGER = LoggerFactory.getLogger(GrpcSendingMailbox.class);
    private final String _id;
    private final ChannelManager _channelManager;
    private final String _hostname;
    private final int _port;
    private final long _deadlineMs;
    private final StatMap<MailboxSendOperator.StatKey> _statMap;
    private final MailboxStatusObserver _statusObserver = new MailboxStatusObserver();
    private StreamObserver<Mailbox.MailboxContent> _contentObserver;

    public GrpcSendingMailbox(String str, ChannelManager channelManager, String str2, int i, long j, StatMap<MailboxSendOperator.StatKey> statMap) {
        this._id = str;
        this._channelManager = channelManager;
        this._hostname = str2;
        this._port = i;
        this._deadlineMs = j;
        this._statMap = statMap;
    }

    @Override // org.apache.pinot.query.mailbox.SendingMailbox
    public void send(TransferableBlock transferableBlock) throws IOException {
        if (isTerminated() || (isEarlyTerminated() && !transferableBlock.isEndOfStreamBlock())) {
            LOGGER.debug("==[GRPC SEND]== terminated or early terminated mailbox. Skipping sending message {} to: {}", transferableBlock, this._id);
            return;
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("==[GRPC SEND]== sending message " + transferableBlock + " to: " + this._id);
        }
        if (this._contentObserver == null) {
            this._contentObserver = getContentObserver();
        }
        this._contentObserver.onNext(toMailboxContent(transferableBlock));
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("==[GRPC SEND]== message " + transferableBlock + " sent to: " + this._id);
        }
    }

    @Override // org.apache.pinot.query.mailbox.SendingMailbox
    public void complete() {
        if (isTerminated()) {
            LOGGER.debug("Already terminated mailbox: {}", this._id);
        } else {
            LOGGER.debug("Completing mailbox: {}", this._id);
            this._contentObserver.onCompleted();
        }
    }

    @Override // org.apache.pinot.query.mailbox.SendingMailbox
    public void cancel(Throwable th) {
        String message;
        if (isTerminated()) {
            LOGGER.debug("Already terminated mailbox: {}", this._id);
            return;
        }
        LOGGER.debug("Cancelling mailbox: {}", this._id);
        if (this._contentObserver == null) {
            this._contentObserver = getContentObserver();
        }
        if (th != null) {
            try {
                message = th.getMessage();
            } catch (Exception e) {
                LOGGER.debug("Caught exception cancelling mailbox: {}", this._id, e);
                return;
            }
        } else {
            message = "Unknown";
        }
        this._contentObserver.onNext(toMailboxContent(TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException("Cancelled by sender with exception: " + message, th))));
        this._contentObserver.onCompleted();
    }

    @Override // org.apache.pinot.query.mailbox.SendingMailbox
    public boolean isEarlyTerminated() {
        return this._statusObserver.isEarlyTerminated();
    }

    @Override // org.apache.pinot.query.mailbox.SendingMailbox
    public boolean isTerminated() {
        return this._statusObserver.isFinished();
    }

    private StreamObserver<Mailbox.MailboxContent> getContentObserver() {
        return PinotMailboxGrpc.newStub(this._channelManager.getChannel(this._hostname, this._port)).withDeadlineAfter(this._deadlineMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS).open(this._statusObserver);
    }

    private Mailbox.MailboxContent toMailboxContent(TransferableBlock transferableBlock) throws IOException {
        this._statMap.merge(MailboxSendOperator.StatKey.RAW_MESSAGES, 1);
        long currentTimeMillis = System.currentTimeMillis();
        try {
            try {
                ByteString byteString = DataBlockUtils.toByteString(transferableBlock.getDataBlock());
                int size = byteString.size();
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("Serialized block: {} to {} bytes", transferableBlock, Integer.valueOf(size));
                }
                this._statMap.merge(MailboxSendOperator.StatKey.SERIALIZED_BYTES, size);
                Mailbox.MailboxContent build = Mailbox.MailboxContent.newBuilder().setMailboxId(this._id).setPayload(byteString).build();
                this._statMap.merge(MailboxSendOperator.StatKey.SERIALIZATION_TIME_MS, System.currentTimeMillis() - currentTimeMillis);
                return build;
            } catch (Throwable th) {
                LOGGER.warn("Caught exception while serializing block: {}", transferableBlock, th);
                throw th;
            }
        } catch (Throwable th2) {
            this._statMap.merge(MailboxSendOperator.StatKey.SERIALIZATION_TIME_MS, System.currentTimeMillis() - currentTimeMillis);
            throw th2;
        }
    }

    public String toString() {
        return "g" + this._id;
    }
}
