package org.apache.storm.executor.spout;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import org.apache.storm.daemon.Acker;
import org.apache.storm.daemon.Task;
import org.apache.storm.executor.TupleInfo;
import org.apache.storm.spout.ISpout;
import org.apache.storm.spout.ISpoutOutputCollector;
import org.apache.storm.tuple.AddressedTuple;
import org.apache.storm.tuple.MessageId;
import org.apache.storm.tuple.TupleImpl;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.MutableLong;
import org.apache.storm.utils.RotatingMap;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/executor/spout/SpoutOutputCollectorImpl.class */
public class SpoutOutputCollectorImpl implements ISpoutOutputCollector {
    private static final Logger LOG = LoggerFactory.getLogger(SpoutOutputCollectorImpl.class);
    private final SpoutExecutor executor;
    private final Task taskData;
    private final int taskId;
    private final MutableLong emittedCount;
    private final boolean hasAckers;
    private final Random random;
    private final Boolean isEventLoggers;
    private final Boolean isDebug;
    private final RotatingMap<Long, TupleInfo> pending;
    private final long spoutExecutorThdId;
    private TupleInfo globalTupleInfo = new TupleInfo();

    public SpoutOutputCollectorImpl(ISpout iSpout, SpoutExecutor spoutExecutor, Task task, MutableLong mutableLong, boolean z, Random random, Boolean bool, Boolean bool2, RotatingMap<Long, TupleInfo> rotatingMap) {
        this.executor = spoutExecutor;
        this.taskData = task;
        this.taskId = task.getTaskId().intValue();
        this.emittedCount = mutableLong;
        this.hasAckers = z;
        this.random = random;
        this.isEventLoggers = bool;
        this.isDebug = bool2;
        this.pending = rotatingMap;
        this.spoutExecutorThdId = spoutExecutor.getThreadId();
    }

    @Override // org.apache.storm.spout.ISpoutOutputCollector
    public List<Integer> emit(String str, List<Object> list, Object obj) {
        try {
            return sendSpoutMsg(str, list, obj, null);
        } catch (InterruptedException e) {
            LOG.warn("Spout thread interrupted during emit().");
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.storm.spout.ISpoutOutputCollector
    public void emitDirect(int i, String str, List<Object> list, Object obj) {
        try {
            sendSpoutMsg(str, list, obj, Integer.valueOf(i));
        } catch (InterruptedException e) {
            LOG.warn("Spout thread interrupted during emitDirect().");
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.storm.spout.ISpoutOutputCollector
    public void flush() {
        try {
            this.executor.getExecutorTransfer().flush();
        } catch (InterruptedException e) {
            LOG.warn("Spout thread interrupted during flush().");
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.storm.spout.ISpoutOutputCollector
    public long getPendingCount() {
        return this.pending.size();
    }

    @Override // org.apache.storm.task.IErrorReporter
    public void reportError(Throwable th) {
        this.executor.incrementReportedErrorCount();
        this.executor.getReportError().report(th);
    }

    private List<Integer> sendSpoutMsg(String str, List<Object> list, Object obj, Integer num) throws InterruptedException {
        MessageId makeUnanchored;
        this.emittedCount.increment();
        List<Integer> outgoingTasks = num != null ? this.taskData.getOutgoingTasks(num, str, list) : this.taskData.getOutgoingTasks(str, list);
        boolean z = obj != null && this.hasAckers;
        ArrayList arrayList = z ? new ArrayList() : null;
        long generateId = z ? MessageId.generateId(this.random) : 0L;
        for (int i = 0; i < outgoingTasks.size(); i++) {
            Integer num2 = outgoingTasks.get(i);
            if (z) {
                long generateId2 = MessageId.generateId(this.random);
                makeUnanchored = MessageId.makeRootId(generateId, generateId2);
                arrayList.add(Long.valueOf(generateId2));
            } else {
                makeUnanchored = MessageId.makeUnanchored();
            }
            this.executor.getExecutorTransfer().tryTransfer(new AddressedTuple(num2.intValue(), new TupleImpl(this.executor.getWorkerTopologyContext(), list, this.executor.getComponentId(), this.taskId, str, makeUnanchored)), this.executor.getPendingEmits());
        }
        if (this.isEventLoggers.booleanValue()) {
            this.taskData.sendToEventLogger(this.executor, list, this.executor.getComponentId(), obj, this.random, this.executor.getPendingEmits());
        }
        if (z) {
            TupleInfo tupleInfo = new TupleInfo();
            tupleInfo.setTaskId(this.taskId);
            tupleInfo.setStream(str);
            tupleInfo.setMessageId(obj);
            tupleInfo.setRootId(generateId);
            if (this.isDebug.booleanValue()) {
                tupleInfo.setValues(list);
            }
            if (this.executor.samplerCheck()) {
                tupleInfo.setTimestamp(System.currentTimeMillis());
            }
            this.pending.put(Long.valueOf(generateId), tupleInfo);
            this.taskData.sendUnanchored(Acker.ACKER_INIT_STREAM_ID, new Values(Long.valueOf(generateId), Long.valueOf(Utils.bitXorVals(arrayList)), Integer.valueOf(this.taskId)), this.executor.getExecutorTransfer(), this.executor.getPendingEmits());
        } else if (obj != null) {
            if (this.isDebug.booleanValue() && this.spoutExecutorThdId != Thread.currentThread().getId()) {
                throw new RuntimeException("Detected background thread emitting tuples for the spout. Spout Output Collector should only emit from the main spout executor thread.");
            }
            this.globalTupleInfo.clear();
            this.globalTupleInfo.setStream(str);
            this.globalTupleInfo.setValues(list);
            this.globalTupleInfo.setMessageId(obj);
            this.globalTupleInfo.setTimestamp(0L);
            this.globalTupleInfo.setRootId(generateId);
            this.executor.ackSpoutMsg(this.executor, this.taskData, 0L, this.globalTupleInfo);
        }
        return outgoingTasks;
    }
}
