/*
 * Decompiled with CFR 0.152.
 */
package com.wavefront.agent.handlers;

import com.google.common.util.concurrent.RateLimiter;
import com.google.common.util.concurrent.RecyclableRateLimiter;
import com.wavefront.agent.data.EntityProperties;
import com.wavefront.agent.data.QueueingReason;
import com.wavefront.agent.data.TaskResult;
import com.wavefront.agent.handlers.HandlerKey;
import com.wavefront.agent.handlers.SenderTask;
import com.wavefront.common.NamedThreadFactory;
import com.wavefront.common.TaggedMetricName;
import com.wavefront.common.logger.SharedRateLimitingLogger;
import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Counter;
import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.Histogram;
import com.yammer.metrics.core.MetricName;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;

abstract class AbstractSenderTask<T>
implements SenderTask<T>,
Runnable {
    private static final Logger logger = Logger.getLogger(AbstractSenderTask.class.getCanonicalName());
    protected final Logger throttledLogger;
    List<T> datum = new ArrayList<T>();
    int datumSize;
    final Object mutex = new Object();
    final ScheduledExecutorService scheduler;
    private final ExecutorService flushExecutor;
    final HandlerKey handlerKey;
    final int threadId;
    final EntityProperties properties;
    final RecyclableRateLimiter rateLimiter;
    final Counter attemptedCounter;
    final Counter blockedCounter;
    final Counter bufferFlushCounter;
    final Counter bufferCompletedFlushCounter;
    private final Histogram metricSize;
    private final AtomicBoolean isRunning = new AtomicBoolean(false);
    final AtomicBoolean isBuffering = new AtomicBoolean(false);
    volatile boolean isSending = false;
    private final RateLimiter drainBuffersRateLimiter = RateLimiter.create((double)10.0);
    private final Runnable drainBuffersToQueueTask = new Runnable(){

        @Override
        public void run() {
            if (AbstractSenderTask.this.datumSize > AbstractSenderTask.this.properties.getMemoryBufferLimit()) {
                logger.warning("[" + AbstractSenderTask.this.handlerKey.getHandle() + " thread " + AbstractSenderTask.this.threadId + "]: WF-3 Too many pending " + AbstractSenderTask.this.handlerKey.getEntityType() + " (" + AbstractSenderTask.this.datumSize + "), block size: " + AbstractSenderTask.this.properties.getDataPerBatch() + ". flushing to retry queue");
                AbstractSenderTask.this.drainBuffersToQueue(QueueingReason.BUFFER_SIZE);
                logger.info("[" + AbstractSenderTask.this.handlerKey.getHandle() + " thread " + AbstractSenderTask.this.threadId + "]: flushing to retry queue complete. Pending " + AbstractSenderTask.this.handlerKey.getEntityType() + ": " + AbstractSenderTask.this.datumSize);
            }
        }
    };

    AbstractSenderTask(HandlerKey handlerKey, int threadId, EntityProperties properties, ScheduledExecutorService scheduler) {
        this.handlerKey = handlerKey;
        this.threadId = threadId;
        this.properties = properties;
        this.rateLimiter = properties.getRateLimiter();
        this.scheduler = scheduler;
        this.throttledLogger = new SharedRateLimitingLogger(logger, "rateLimit-" + handlerKey, 0.2);
        this.flushExecutor = new ThreadPoolExecutor(1, 1, 60L, TimeUnit.MINUTES, new SynchronousQueue<Runnable>(), (ThreadFactory)new NamedThreadFactory("flush-" + handlerKey.toString() + "-" + threadId));
        this.attemptedCounter = Metrics.newCounter((MetricName)new MetricName(handlerKey.toString(), "", "sent"));
        this.blockedCounter = Metrics.newCounter((MetricName)new MetricName(handlerKey.toString(), "", "blocked"));
        this.bufferFlushCounter = Metrics.newCounter((MetricName)new TaggedMetricName("buffer", "flush-count", new String[]{"port", handlerKey.getHandle()}));
        this.bufferCompletedFlushCounter = Metrics.newCounter((MetricName)new TaggedMetricName("buffer", "completed-flush-count", new String[]{"port", handlerKey.getHandle()}));
        this.metricSize = Metrics.newHistogram((MetricName)new MetricName(handlerKey.toString() + "." + threadId, "", "metric_length"));
        Metrics.newGauge((MetricName)new MetricName(handlerKey.toString() + "." + threadId, "", "size"), (Gauge)new Gauge<Integer>(){

            public Integer value() {
                return AbstractSenderTask.this.datumSize;
            }
        });
    }

    abstract TaskResult processSingleBatch(List<T> var1);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        if (!this.isRunning.get()) {
            return;
        }
        long nextRunMillis = this.properties.getPushFlushInterval();
        this.isSending = true;
        try {
            List<T> current = this.createBatch();
            int currentBatchSize = this.getDataSize(current);
            if (currentBatchSize == 0) {
                return;
            }
            if (this.rateLimiter == null || this.rateLimiter.tryAcquire(currentBatchSize)) {
                TaskResult result = this.processSingleBatch(current);
                this.attemptedCounter.inc((long)currentBatchSize);
                switch (result) {
                    case DELIVERED: {
                        break;
                    }
                    case PERSISTED: 
                    case PERSISTED_RETRY: {
                        if (this.rateLimiter == null) break;
                        this.rateLimiter.recyclePermits(currentBatchSize);
                        break;
                    }
                    case RETRY_LATER: {
                        this.undoBatch(current);
                        if (this.rateLimiter == null) break;
                        this.rateLimiter.recyclePermits(currentBatchSize);
                    }
                }
            } else {
                long willRetryIn = nextRunMillis = nextRunMillis / 4L + (long)((int)(Math.random() * (double)nextRunMillis / 4.0));
                this.throttledLogger.log(Level.INFO, () -> "[" + this.handlerKey.getHandle() + " thread " + this.threadId + "]: WF-4 Proxy rate limiter active (pending " + this.handlerKey.getEntityType() + ": " + this.datumSize + "), will retry in " + willRetryIn + "ms");
                this.undoBatch(current);
            }
        }
        catch (Throwable t) {
            logger.log(Level.SEVERE, "Unexpected error in flush loop", t);
        }
        finally {
            this.isSending = false;
            if (this.isRunning.get()) {
                this.scheduler.schedule(this, nextRunMillis, TimeUnit.MILLISECONDS);
            }
        }
    }

    @Override
    public void start() {
        if (this.isRunning.compareAndSet(false, true)) {
            this.scheduler.schedule(this, (long)this.properties.getPushFlushInterval(), TimeUnit.MILLISECONDS);
        }
    }

    @Override
    public void stop() {
        this.isRunning.set(false);
        this.flushExecutor.shutdown();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void add(T metricString) {
        this.metricSize.update(metricString.toString().length());
        Object object = this.mutex;
        synchronized (object) {
            this.datum.add(metricString);
            this.datumSize += this.getObjectSize(metricString);
        }
        if (this.datumSize >= this.properties.getMemoryBufferLimit() && !this.isBuffering.get() && this.drainBuffersRateLimiter.tryAcquire()) {
            try {
                this.flushExecutor.submit(this.drainBuffersToQueueTask);
            }
            catch (RejectedExecutionException rejectedExecutionException) {
                // empty catch block
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected List<T> createBatch() {
        List<T> current;
        Object object = this.mutex;
        synchronized (object) {
            int blockSize = this.getBlockSize(this.datum, (int)this.rateLimiter.getRate(), this.properties.getDataPerBatch());
            current = this.datum.subList(0, blockSize);
            this.datumSize -= this.getDataSize(current);
            this.datum = new ArrayList<T>(this.datum.subList(blockSize, this.datum.size()));
        }
        logger.fine("[" + this.handlerKey.getHandle() + "] (DETAILED): sending " + current.size() + " valid " + this.handlerKey.getEntityType() + "; in memory: " + this.datumSize + "; total attempted: " + this.attemptedCounter.count() + "; total blocked: " + this.blockedCounter.count());
        return current;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void undoBatch(List<T> batch) {
        Object object = this.mutex;
        synchronized (object) {
            this.datum.addAll(0, batch);
            this.datumSize += this.getDataSize(batch);
        }
    }

    abstract void flushSingleBatch(List<T> var1, @Nullable QueueingReason var2);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void drainBuffersToQueue(@Nullable QueueingReason reason) {
        if (this.isBuffering.compareAndSet(false, true)) {
            this.bufferFlushCounter.inc();
            try {
                List<T> batch;
                int batchSize;
                int lastBatchSize = Integer.MIN_VALUE;
                for (int toFlush = this.datum.size(); toFlush > 0 && (batchSize = (batch = this.createBatch()).size()) > 0; toFlush -= batchSize) {
                    this.flushSingleBatch(batch, reason);
                    this.attemptedCounter.inc((long)batchSize);
                    if (batchSize < lastBatchSize) {
                        break;
                    }
                    lastBatchSize = batchSize;
                }
            }
            finally {
                this.isBuffering.set(false);
                this.bufferCompletedFlushCounter.inc();
            }
        }
    }

    @Override
    public long getTaskRelativeScore() {
        return this.datumSize + (this.isBuffering.get() ? this.properties.getMemoryBufferLimit() : (this.isSending ? this.properties.getDataPerBatch() / 2 : 0));
    }

    protected int getBlockSize(List<T> datum, int ratelimit, int batchSize) {
        return Math.min(Math.min(this.getDataSize(datum), ratelimit), batchSize);
    }

    protected int getDataSize(List<T> data) {
        return data.size();
    }

    protected int getObjectSize(T object) {
        return 1;
    }
}

