package org.apache.pinot.query.runtime.operator;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.common.datatable.StatMap;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.physical.MailboxIdUtils;
import org.apache.pinot.query.planner.plannode.MailboxSendNode;
import org.apache.pinot.query.routing.MailboxInfos;
import org.apache.pinot.query.runtime.blocks.BlockSplitter;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.MultiStageOperator;
import org.apache.pinot.query.runtime.operator.exchange.BlockExchange;
import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.apache.pinot.spi.exception.QueryCancelledException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/query/runtime/operator/MailboxSendOperator.class */
public class MailboxSendOperator extends MultiStageOperator {
    public static final EnumSet<RelDistribution.Type> SUPPORTED_EXCHANGE_TYPES = EnumSet.of(RelDistribution.Type.SINGLETON, RelDistribution.Type.RANDOM_DISTRIBUTED, RelDistribution.Type.BROADCAST_DISTRIBUTED, RelDistribution.Type.HASH_DISTRIBUTED);
    private static final Logger LOGGER = LoggerFactory.getLogger(MailboxSendOperator.class);
    private static final String EXPLAIN_NAME = "MAILBOX_SEND";
    private final MultiStageOperator _input;
    private final BlockExchange _exchange;
    private final StatMap<StatKey> _statMap;

    /* loaded from: input_file:org/apache/pinot/query/runtime/operator/MailboxSendOperator$StatKey.class */
    public enum StatKey implements StatMap.Key {
        EXECUTION_TIME_MS(StatMap.Type.LONG) { // from class: org.apache.pinot.query.runtime.operator.MailboxSendOperator.StatKey.1
            public boolean includeDefaultInJson() {
                return true;
            }
        },
        EMITTED_ROWS(StatMap.Type.LONG) { // from class: org.apache.pinot.query.runtime.operator.MailboxSendOperator.StatKey.2
            public boolean includeDefaultInJson() {
                return true;
            }
        },
        STAGE(StatMap.Type.INT) { // from class: org.apache.pinot.query.runtime.operator.MailboxSendOperator.StatKey.3
            public int merge(int i, int i2) {
                return StatMap.Key.eqNotZero(i, i2);
            }

            public boolean includeDefaultInJson() {
                return true;
            }
        },
        PARALLELISM(StatMap.Type.INT),
        FAN_OUT(StatMap.Type.INT) { // from class: org.apache.pinot.query.runtime.operator.MailboxSendOperator.StatKey.4
            public int merge(int i, int i2) {
                return Math.max(i, i2);
            }
        },
        IN_MEMORY_MESSAGES(StatMap.Type.INT),
        RAW_MESSAGES(StatMap.Type.INT),
        SERIALIZED_BYTES(StatMap.Type.LONG) { // from class: org.apache.pinot.query.runtime.operator.MailboxSendOperator.StatKey.5
            public boolean includeDefaultInJson() {
                return true;
            }
        },
        SERIALIZATION_TIME_MS(StatMap.Type.LONG) { // from class: org.apache.pinot.query.runtime.operator.MailboxSendOperator.StatKey.6
            public boolean includeDefaultInJson() {
                return true;
            }
        };

        private final StatMap.Type _type;

        StatKey(StatMap.Type type) {
            this._type = type;
        }

        public StatMap.Type getType() {
            return this._type;
        }
    }

    public MailboxSendOperator(OpChainExecutionContext opChainExecutionContext, MultiStageOperator multiStageOperator, MailboxSendNode mailboxSendNode) {
        this(opChainExecutionContext, multiStageOperator, (Function<StatMap<StatKey>, BlockExchange>) statMap -> {
            return getBlockExchange(opChainExecutionContext, mailboxSendNode, statMap);
        });
        this._statMap.merge(StatKey.STAGE, opChainExecutionContext.getStageId());
        this._statMap.merge(StatKey.PARALLELISM, 1);
    }

    @VisibleForTesting
    MailboxSendOperator(OpChainExecutionContext opChainExecutionContext, MultiStageOperator multiStageOperator, Function<StatMap<StatKey>, BlockExchange> function) {
        super(opChainExecutionContext);
        this._statMap = new StatMap<>(StatKey.class);
        this._input = multiStageOperator;
        this._exchange = function.apply(this._statMap);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static BlockExchange getBlockExchange(OpChainExecutionContext opChainExecutionContext, MailboxSendNode mailboxSendNode, StatMap<StatKey> statMap) {
        BlockSplitter blockSplitter = TransferableBlockUtils::splitBlock;
        if (!mailboxSendNode.isMultiSend()) {
            return getBlockExchange(opChainExecutionContext, ((Integer) mailboxSendNode.getReceiverStageIds().iterator().next()).intValue(), mailboxSendNode.getDistributionType(), mailboxSendNode.getKeys(), statMap, blockSplitter);
        }
        ArrayList arrayList = new ArrayList();
        BlockSplitter blockSplitter2 = BlockSplitter.NO_OP;
        Iterator it = mailboxSendNode.getReceiverStageIds().iterator();
        while (it.hasNext()) {
            int intValue = ((Integer) it.next()).intValue();
            arrayList.add(getBlockExchange(opChainExecutionContext, intValue, mailboxSendNode.getDistributionType(), mailboxSendNode.getKeys(), statMap, blockSplitter2).asSendingMailbox(Integer.toString(intValue)));
        }
        return BlockExchange.getExchange(arrayList, RelDistribution.Type.BROADCAST_DISTRIBUTED, Collections.emptyList(), blockSplitter);
    }

    private static BlockExchange getBlockExchange(OpChainExecutionContext opChainExecutionContext, int i, RelDistribution.Type type, List<Integer> list, StatMap<StatKey> statMap, BlockSplitter blockSplitter) {
        Preconditions.checkState(SUPPORTED_EXCHANGE_TYPES.contains(type), "Unsupported distribution type: %s", type);
        MailboxService mailboxService = opChainExecutionContext.getMailboxService();
        long requestId = opChainExecutionContext.getRequestId();
        long deadlineMs = opChainExecutionContext.getDeadlineMs();
        List list2 = (List) MailboxIdUtils.toRoutingInfos(requestId, opChainExecutionContext.getStageId(), opChainExecutionContext.getWorkerId(), i, ((MailboxInfos) opChainExecutionContext.getWorkerMetadata().getMailboxInfosMap().get(Integer.valueOf(i))).getMailboxInfos()).stream().map(routingInfo -> {
            return mailboxService.getSendingMailbox(routingInfo.getHostname(), routingInfo.getPort(), routingInfo.getMailboxId(), deadlineMs, statMap);
        }).collect(Collectors.toList());
        statMap.merge(StatKey.FAN_OUT, list2.size());
        return BlockExchange.getExchange(list2, type, list, blockSplitter);
    }

    @Override // org.apache.pinot.query.runtime.operator.MultiStageOperator
    public void registerExecution(long j, int i) {
        this._statMap.merge(StatKey.EXECUTION_TIME_MS, j);
        this._statMap.merge(StatKey.EMITTED_ROWS, i);
    }

    @Override // org.apache.pinot.query.runtime.operator.MultiStageOperator
    public MultiStageOperator.Type getOperatorType() {
        return MultiStageOperator.Type.MAILBOX_SEND;
    }

    @Override // org.apache.pinot.query.runtime.operator.MultiStageOperator
    protected Logger logger() {
        return LOGGER;
    }

    @Override // org.apache.pinot.query.runtime.operator.MultiStageOperator
    public List<MultiStageOperator> getChildOperators() {
        return Collections.singletonList(this._input);
    }

    public String toExplainString() {
        return EXPLAIN_NAME;
    }

    @Override // org.apache.pinot.query.runtime.operator.MultiStageOperator
    protected TransferableBlock getNextBlock() {
        try {
            TransferableBlock m42nextBlock = this._input.m42nextBlock();
            if (m42nextBlock.isSuccessfulEndOfStreamBlock()) {
                updateEosBlock(m42nextBlock, this._statMap);
                sendTransferableBlock(m42nextBlock);
                if (this._context.getStageId() == 1) {
                    updateMetrics(m42nextBlock);
                }
            } else if (sendTransferableBlock(m42nextBlock)) {
                earlyTerminate();
            }
            sampleAndCheckInterruption();
            return m42nextBlock;
        } catch (TimeoutException e) {
            LOGGER.warn("Timed out transferring data on opChain: {}", this._context.getId(), e);
            return TransferableBlockUtils.getErrorTransferableBlock(e);
        } catch (Exception e2) {
            TransferableBlock errorTransferableBlock = TransferableBlockUtils.getErrorTransferableBlock(e2);
            try {
                LOGGER.error("Exception while transferring data on opChain: {}", this._context.getId(), e2);
                sendTransferableBlock(errorTransferableBlock);
            } catch (Exception e3) {
                LOGGER.error("Exception while sending error block.", e3);
            }
            return errorTransferableBlock;
        } catch (QueryCancelledException e4) {
            LOGGER.debug("Query was cancelled! for opChain: {}", this._context.getId());
            return createLeafBlock();
        }
    }

    protected TransferableBlock createLeafBlock() {
        return TransferableBlockUtils.getEndOfStreamTransferableBlock(MultiStageQueryStats.createCancelledSend(this._context.getStageId(), this._statMap));
    }

    private boolean sendTransferableBlock(TransferableBlock transferableBlock) throws Exception {
        boolean send = this._exchange.send(transferableBlock);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("==[SEND]== Block " + transferableBlock + " sent from: " + this._context.getId());
        }
        return send;
    }

    @Override // org.apache.pinot.query.runtime.operator.MultiStageOperator, java.lang.AutoCloseable
    public void close() {
        super.close();
        this._exchange.close();
    }

    @Override // org.apache.pinot.query.runtime.operator.MultiStageOperator
    public void cancel(Throwable th) {
        super.cancel(th);
        this._exchange.cancel(th);
    }

    private void updateMetrics(TransferableBlock transferableBlock) {
        ServerMetrics serverMetrics = ServerMetrics.get();
        MultiStageQueryStats queryStats = transferableBlock.getQueryStats();
        if (queryStats == null) {
            LOGGER.info("Query stats not found in the EOS block.");
            return;
        }
        for (MultiStageQueryStats.StageStats.Closed closed : queryStats.getClosedStats()) {
            if (closed != null) {
                closed.forEach((type, statMap) -> {
                    type.updateServerMetrics(statMap, serverMetrics);
                });
            }
        }
        queryStats.getCurrentStats().forEach((type2, statMap2) -> {
            type2.updateServerMetrics(statMap2, serverMetrics);
        });
    }
}
