package com.aliyun.openservices.aliyun.log.producer.internals;

import com.aliyun.openservices.aliyun.log.producer.ProducerConfig;
import com.aliyun.openservices.log.Client;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/aliyun/openservices/aliyun/log/producer/internals/Mover.class */
public class Mover extends LogThread {
    private static final Logger LOGGER = LoggerFactory.getLogger(ProducerBatch.class);
    private final ProducerConfig producerConfig;
    private final Map<String, Client> clientPool;
    private final LogAccumulator accumulator;
    private final RetryQueue retryQueue;
    private final BlockingQueue<ProducerBatch> successQueue;
    private final BlockingQueue<ProducerBatch> failureQueue;
    private final IOThreadPool ioThreadPool;
    private final AtomicInteger batchCount;
    private volatile boolean closed;

    public Mover(String str, ProducerConfig producerConfig, Map<String, Client> map, LogAccumulator logAccumulator, RetryQueue retryQueue, BlockingQueue<ProducerBatch> blockingQueue, BlockingQueue<ProducerBatch> blockingQueue2, IOThreadPool iOThreadPool, AtomicInteger atomicInteger) {
        super(str, true);
        this.producerConfig = producerConfig;
        this.clientPool = map;
        this.accumulator = logAccumulator;
        this.retryQueue = retryQueue;
        this.successQueue = blockingQueue;
        this.failureQueue = blockingQueue2;
        this.ioThreadPool = iOThreadPool;
        this.batchCount = atomicInteger;
        this.closed = false;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        loopMoveBatches();
        LOGGER.debug("Beginning shutdown of mover thread");
        List<ProducerBatch> incompleteBatches = incompleteBatches();
        LOGGER.debug("Submit incomplete batches, size={}", Integer.valueOf(incompleteBatches.size()));
        submitIncompleteBatches(incompleteBatches);
        LOGGER.debug("Shutdown of mover thread has completed");
    }

    private void loopMoveBatches() {
        while (!this.closed) {
            try {
                moveBatches();
            } catch (Exception e) {
                LOGGER.error("Uncaught exception in mover, e=", e);
            }
        }
    }

    private void moveBatches() {
        LOGGER.debug("Prepare to move expired batches from accumulator and retry queue to ioThreadPool");
        doMoveBatches();
        LOGGER.debug("Move expired batches successfully");
    }

    private void doMoveBatches() {
        ExpiredBatches expiredBatches = this.accumulator.expiredBatches();
        LOGGER.debug("Expired batches from accumulator, size={}, remainingMs={}", Integer.valueOf(expiredBatches.getBatches().size()), Long.valueOf(expiredBatches.getRemainingMs()));
        Iterator<ProducerBatch> it = expiredBatches.getBatches().iterator();
        while (it.hasNext()) {
            this.ioThreadPool.submit(createSendProducerBatchTask(it.next()));
        }
        List<ProducerBatch> expiredBatches2 = this.retryQueue.expiredBatches(expiredBatches.getRemainingMs());
        LOGGER.debug("Expired batches from retry queue, size={}", Integer.valueOf(expiredBatches2.size()));
        Iterator<ProducerBatch> it2 = expiredBatches2.iterator();
        while (it2.hasNext()) {
            this.ioThreadPool.submit(createSendProducerBatchTask(it2.next()));
        }
    }

    private List<ProducerBatch> incompleteBatches() {
        List<ProducerBatch> remainingBatches = this.accumulator.remainingBatches();
        remainingBatches.addAll(this.retryQueue.remainingBatches());
        return remainingBatches;
    }

    private void submitIncompleteBatches(List<ProducerBatch> list) {
        Iterator<ProducerBatch> it = list.iterator();
        while (it.hasNext()) {
            this.ioThreadPool.submit(createSendProducerBatchTask(it.next()));
        }
    }

    private SendProducerBatchTask createSendProducerBatchTask(ProducerBatch producerBatch) {
        return new SendProducerBatchTask(producerBatch, this.producerConfig, this.clientPool, this.retryQueue, this.successQueue, this.failureQueue, this.batchCount);
    }

    public void close() {
        this.closed = true;
        interrupt();
    }
}
