package org.apache.storm.daemon.worker;

import java.util.Collections;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.storm.Config;
import org.apache.storm.messaging.TaskMessage;
import org.apache.storm.policy.IWaitStrategy;
import org.apache.storm.serialization.ITupleSerializer;
import org.apache.storm.tuple.AddressedTuple;
import org.apache.storm.utils.JCQueue;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.TransferDrainer;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/daemon/worker/WorkerTransfer.class */
public class WorkerTransfer implements JCQueue.Consumer {
    static final Logger LOG = LoggerFactory.getLogger(WorkerTransfer.class);
    private final TransferDrainer drainer = new TransferDrainer();
    private final WorkerState workerState;
    private final IWaitStrategy backPressureWaitStrategy;
    private JCQueue transferQueue;
    private final AtomicBoolean[] remoteBackPressureStatus;

    public WorkerTransfer(WorkerState workerState, Map<String, Object> map, int i) {
        this.workerState = workerState;
        this.backPressureWaitStrategy = IWaitStrategy.createBackPressureWaitStrategy(map);
        this.remoteBackPressureStatus = new AtomicBoolean[i + 1];
        for (int i2 = 0; i2 < this.remoteBackPressureStatus.length; i2++) {
            this.remoteBackPressureStatus[i2] = new AtomicBoolean(false);
        }
        Integer num = ObjectReader.getInt(map.get(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE));
        Integer num2 = ObjectReader.getInt(map.get(Config.TOPOLOGY_TRANSFER_BATCH_SIZE));
        if (num2.intValue() > num.intValue() / 2) {
            throw new IllegalArgumentException("topology.transfer.batch.size:" + num2 + " must be no more than half of " + Config.TOPOLOGY_TRANSFER_BUFFER_SIZE + ":" + num);
        }
        this.transferQueue = new JCQueue("worker-transfer-queue", "worker-transfer-queue", num.intValue(), 0, num2.intValue(), this.backPressureWaitStrategy, workerState.getTopologyId(), "__system", Collections.singletonList(-1), workerState.getPort(), workerState.getMetricRegistry());
    }

    public JCQueue getTransferQueue() {
        return this.transferQueue;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AtomicBoolean[] getRemoteBackPressureStatus() {
        return this.remoteBackPressureStatus;
    }

    public Utils.SmartThread makeTransferThread() {
        return Utils.asyncLoop(() -> {
            return this.transferQueue.consume(this) == 0 ? 1L : 0L;
        });
    }

    @Override // org.apache.storm.utils.JCQueue.Consumer
    public void accept(Object obj) {
        this.drainer.add((TaskMessage) obj);
    }

    @Override // org.apache.storm.utils.JCQueue.Consumer
    public void flush() throws InterruptedException {
        ReentrantReadWriteLock.ReadLock readLock = this.workerState.endpointSocketLock.readLock();
        try {
            readLock.lock();
            this.drainer.send(this.workerState.cachedTaskToNodePort.get(), this.workerState.cachedNodeToPortSocket.get());
            this.drainer.clear();
        } finally {
            readLock.unlock();
        }
    }

    public boolean tryTransferRemote(AddressedTuple addressedTuple, Queue<AddressedTuple> queue, ITupleSerializer iTupleSerializer) {
        if (queue != null && !queue.isEmpty()) {
            queue.add(addressedTuple);
            return false;
        }
        if (this.remoteBackPressureStatus[addressedTuple.dest].get()) {
            LOG.debug("Noticed Back Pressure in remote task {}", Integer.valueOf(addressedTuple.dest));
        } else {
            if (this.transferQueue.tryPublish(new TaskMessage(addressedTuple.getDest(), iTupleSerializer.serialize(addressedTuple.getTuple())))) {
                return true;
            }
        }
        if (queue == null) {
            return false;
        }
        queue.add(addressedTuple);
        return false;
    }

    public void flushRemotes() throws InterruptedException {
        this.transferQueue.flush();
    }

    public boolean tryFlushRemotes() {
        return this.transferQueue.tryFlush();
    }

    public void haltTransferThd() {
        this.transferQueue.close();
    }
}
