package io.github.tramchamploo.bufferslayer;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.github.tramchamploo.bufferslayer.Message;
import io.github.tramchamploo.bufferslayer.OverflowStrategy;
import io.github.tramchamploo.bufferslayer.QueueManager;
import io.github.tramchamploo.bufferslayer.Reporter;
import io.github.tramchamploo.bufferslayer.internal.Component;
import io.github.tramchamploo.bufferslayer.internal.CompositeFuture;
import io.github.tramchamploo.bufferslayer.internal.Future;
import io.github.tramchamploo.bufferslayer.internal.FutureListener;
import io.github.tramchamploo.bufferslayer.internal.MessageFuture;
import io.github.tramchamploo.bufferslayer.internal.MessagePromise;
import io.github.tramchamploo.bufferslayer.internal.SucceededFuture;
import java.io.Flushable;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/github/tramchamploo/bufferslayer/AsyncReporter.class */
public class AsyncReporter<M extends Message, R> extends TimeDriven<Message.MessageKey> implements Reporter<M, R>, Flushable {
    private static final int MAX_BUFFER_POOL_ENTRIES = 1000;
    final AsyncSender<R> sender;
    final long messageTimeoutNanos;
    final int bufferedMaxMessages;
    private final ReporterMetrics metrics;
    private final boolean singleKey;
    private final int timerThreads;
    private final int flushThreads;
    Set<Thread> flushers;
    private static HashedWheelTimer hashedWheelTimer;
    final QueueManager queueManager;
    CountDownLatch close;
    ScheduledExecutorService scheduler;
    private BufferPool bufferPool;
    private final MemoryLimiter memoryLimiter;
    static final Logger logger = LoggerFactory.getLogger(AsyncReporter.class);
    static AtomicLong idGenerator = new AtomicLong();
    private static ThreadFactory hashedWheelTimerThreadFactory = new ThreadFactoryBuilder().setNameFormat(AsyncReporter.class.getSimpleName() + "-cleaner").setDaemon(true).build();
    final Long id = Long.valueOf(idGenerator.getAndIncrement());
    final FlushSynchronizer synchronizer = new FlushSynchronizer();
    private final AtomicBoolean started = new AtomicBoolean(false);
    final AtomicBoolean closed = new AtomicBoolean(false);
    private final FlushThreadFactory flushThreadFactory = new FlushThreadFactory(this);

    /* loaded from: input_file:io/github/tramchamploo/bufferslayer/AsyncReporter$Builder.class */
    public static final class Builder<M extends Message, R> extends Reporter.Builder<M, R> {
        int sharedSenderThreads;
        int flushThreads;
        int timerThreads;
        long pendingKeepaliveNanos;
        long tickDurationNanos;
        int ticksPerWheel;
        boolean singleKey;
        int totalQueuedMessages;

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder(Sender<M, R> sender) {
            super(sender);
            this.sharedSenderThreads = 1;
            this.flushThreads = 1;
            this.timerThreads = 1;
            this.pendingKeepaliveNanos = TimeUnit.SECONDS.toNanos(60L);
            this.tickDurationNanos = TimeUnit.MILLISECONDS.toNanos(100L);
            this.ticksPerWheel = 512;
            this.singleKey = false;
            this.totalQueuedMessages = 100000;
        }

        /* renamed from: metrics, reason: merged with bridge method [inline-methods] */
        public Builder<M, R> m6metrics(ReporterMetrics reporterMetrics) {
            super.metrics(reporterMetrics);
            return this;
        }

        /* renamed from: messageTimeout, reason: merged with bridge method [inline-methods] */
        public Builder<M, R> m5messageTimeout(long j, TimeUnit timeUnit) {
            super.messageTimeout(j, timeUnit);
            return this;
        }

        /* renamed from: bufferedMaxMessages, reason: merged with bridge method [inline-methods] */
        public Builder<M, R> m4bufferedMaxMessages(int i) {
            super.bufferedMaxMessages(i);
            return this;
        }

        /* renamed from: pendingMaxMessages, reason: merged with bridge method [inline-methods] */
        public Builder<M, R> m3pendingMaxMessages(int i) {
            super.pendingMaxMessages(i);
            return this;
        }

        /* renamed from: overflowStrategy, reason: merged with bridge method [inline-methods] */
        public Builder<M, R> m2overflowStrategy(OverflowStrategy.Strategy strategy) {
            super.overflowStrategy(strategy);
            return this;
        }

        public Builder<M, R> sharedSenderThreads(int i) {
            Preconditions.checkArgument(i > 0, "sharedSenderThreads > 0: %s", new Object[]{Integer.valueOf(i)});
            this.sharedSenderThreads = i;
            return this;
        }

        public Builder<M, R> flushThreads(int i) {
            Preconditions.checkArgument(i > 0, "flushThreads > 0: %s", new Object[]{Integer.valueOf(i)});
            this.flushThreads = i;
            return this;
        }

        public Builder<M, R> timerThreads(int i) {
            Preconditions.checkArgument(i > 0, "timerThreads > 0: %s", new Object[]{Integer.valueOf(i)});
            this.timerThreads = i;
            return this;
        }

        public Builder<M, R> pendingKeepalive(long j, TimeUnit timeUnit) {
            Preconditions.checkArgument(j > 0, "keepalive > 0: %s", new Object[]{Long.valueOf(j)});
            this.pendingKeepaliveNanos = timeUnit.toNanos(j);
            return this;
        }

        public Builder<M, R> tickDuration(long j, TimeUnit timeUnit) {
            Preconditions.checkArgument(j > 0, "tickDuration > 0: %s", new Object[]{Long.valueOf(j)});
            this.tickDurationNanos = timeUnit.toNanos(j);
            return this;
        }

        public Builder<M, R> ticksPerWheel(int i) {
            Preconditions.checkArgument(i > 0, "ticksPerWheel > 0: %s", new Object[]{Integer.valueOf(i)});
            this.ticksPerWheel = i;
            return this;
        }

        public Builder<M, R> singleKey(boolean z) {
            this.singleKey = z;
            return this;
        }

        public Builder<M, R> totalQueuedMessages(int i) {
            Preconditions.checkArgument(i >= 0, "totalQueuedMessages >= 0: %s", new Object[]{Integer.valueOf(i)});
            this.totalQueuedMessages = i;
            return this;
        }

        /* renamed from: build, reason: merged with bridge method [inline-methods] */
        public AsyncReporter<M, R> m1build() {
            Preconditions.checkArgument(this.totalQueuedMessages >= this.pendingMaxMessages, "totalQueuedMessages >= pendingMaxMessages: %s", new Object[]{Integer.valueOf(this.totalQueuedMessages)});
            return new AsyncReporter<>(this);
        }
    }

    AsyncReporter(Builder<M, R> builder) {
        this.sender = toAsyncSender(builder);
        this.metrics = builder.metrics;
        this.memoryLimiter = MemoryLimiter.maxOf(builder.totalQueuedMessages, this.metrics);
        this.messageTimeoutNanos = builder.messageTimeoutNanos;
        this.bufferedMaxMessages = builder.bufferedMaxMessages;
        this.singleKey = builder.singleKey;
        this.flushThreads = builder.flushThreads;
        this.timerThreads = builder.timerThreads;
        this.queueManager = new QueueManager(builder.pendingMaxMessages, builder.overflowStrategy, builder.pendingKeepaliveNanos, this, this.metrics, initHashedWheelTimer(builder));
        this.bufferPool = new BufferPool(MAX_BUFFER_POOL_ENTRIES, builder.bufferedMaxMessages, builder.singleKey);
        if (this.messageTimeoutNanos > 0) {
            this.queueManager.onCreate(new QueueManager.CreateCallback() { // from class: io.github.tramchamploo.bufferslayer.AsyncReporter.1
                @Override // io.github.tramchamploo.bufferslayer.QueueManager.CreateCallback
                public void call(AbstractSizeBoundedQueue abstractSizeBoundedQueue) {
                    AsyncReporter.this.schedulePeriodically(abstractSizeBoundedQueue.key, AsyncReporter.this.messageTimeoutNanos);
                }
            });
        }
    }

    private static HashedWheelTimer initHashedWheelTimer(Builder<?, ?> builder) {
        synchronized (AsyncReporter.class) {
            if (hashedWheelTimer == null) {
                hashedWheelTimer = new HashedWheelTimer(hashedWheelTimerThreadFactory, builder.tickDurationNanos, TimeUnit.NANOSECONDS, builder.ticksPerWheel);
            }
        }
        return hashedWheelTimer;
    }

    AsyncSender<R> toAsyncSender(Builder<M, R> builder) {
        return new AsyncSenderAdaptor(builder.sender, builder.sharedSenderThreads);
    }

    public static <M extends Message, R> Builder<M, R> builder(Sender<M, R> sender) {
        return new Builder<>(sender);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.github.tramchamploo.bufferslayer.TimeDriven
    public void onTimer(Message.MessageKey messageKey) {
        AbstractSizeBoundedQueue abstractSizeBoundedQueue = this.queueManager.get(messageKey);
        if (abstractSizeBoundedQueue == null || abstractSizeBoundedQueue.size() <= 0) {
            return;
        }
        flush(abstractSizeBoundedQueue);
    }

    @Override // io.github.tramchamploo.bufferslayer.TimeDriven
    protected ScheduledExecutorService scheduler() {
        if (this.scheduler == null) {
            synchronized (this) {
                if (this.scheduler == null) {
                    ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(this.timerThreads, new ThreadFactoryBuilder().setNameFormat("AsyncReporter-" + this.id + "-timer-%d").setDaemon(true).build());
                    scheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true);
                    this.scheduler = scheduledThreadPoolExecutor;
                    return scheduledThreadPoolExecutor;
                }
            }
        }
        return this.scheduler;
    }

    public MessageFuture<R> report(M m) {
        Preconditions.checkNotNull(m, "message");
        this.metrics.incrementMessages(1);
        if (this.closed.get()) {
            MessageFuture<R> newFailedFuture = m.newFailedFuture(MessageDroppedException.dropped(new IllegalStateException("closed!"), Collections.singletonList(m)));
            setFailListener(newFailedFuture);
            return newFailedFuture;
        }
        if (this.started.compareAndSet(false, true) && this.messageTimeoutNanos > 0) {
            startFlushThreads();
        }
        this.memoryLimiter.waitWhenMaximum();
        AbstractSizeBoundedQueue orCreate = this.queueManager.getOrCreate(this.singleKey ? Message.SINGLE_KEY : m.asMessageKey());
        MessagePromise<?> newPromise = m.newPromise();
        orCreate.offer(newPromise);
        setFailListener(newPromise);
        if (orCreate.size() >= this.bufferedMaxMessages) {
            this.synchronizer.offer(orCreate);
        }
        return newPromise;
    }

    private void startFlushThreads() {
        HashSet hashSet = new HashSet(this.flushThreads);
        for (int i = 0; i < this.flushThreads; i++) {
            Thread newFlushThread = this.flushThreadFactory.newFlushThread();
            newFlushThread.start();
            hashSet.add(newFlushThread);
        }
        this.flushers = Collections.unmodifiableSet(hashSet);
    }

    private void setFailListener(MessageFuture<R> messageFuture) {
        messageFuture.addListener(new FutureListener<R>() { // from class: io.github.tramchamploo.bufferslayer.AsyncReporter.2
            public void operationComplete(Future<R> future) {
                if (future.isSuccess()) {
                    return;
                }
                AsyncReporter.this.metrics.incrementMessagesDropped(1);
            }
        });
    }

    @Override // java.io.Flushable
    public void flush() {
        Iterator<AbstractSizeBoundedQueue> it = this.queueManager.elements().iterator();
        while (it.hasNext()) {
            flush(it.next()).awaitUninterruptibly();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Future<?> flush(AbstractSizeBoundedQueue abstractSizeBoundedQueue) {
        Buffer acquire = this.bufferPool.acquire();
        try {
            if (abstractSizeBoundedQueue.drainTo(acquire) == 0) {
                SucceededFuture succeededFuture = new SucceededFuture((Executor) null, (Object) null);
                this.bufferPool.release(acquire);
                return succeededFuture;
            }
            CompositeFuture send = this.sender.send((List) acquire.drain());
            this.bufferPool.release(acquire);
            this.metrics.updateQueuedMessages(abstractSizeBoundedQueue.key, abstractSizeBoundedQueue.size());
            if (!this.memoryLimiter.isMaximum()) {
                this.memoryLimiter.signalAll();
            }
            logWhenFailed(send);
            return send;
        } catch (Throwable th) {
            this.bufferPool.release(acquire);
            throw th;
        }
    }

    private void logWhenFailed(CompositeFuture compositeFuture) {
        compositeFuture.addListener(new FutureListener<CompositeFuture>() { // from class: io.github.tramchamploo.bufferslayer.AsyncReporter.3
            public void operationComplete(Future<CompositeFuture> future) {
                if (future.isSuccess()) {
                    return;
                }
                AsyncReporter.logger.warn(future.cause().getMessage());
            }
        });
    }

    public Component.CheckResult check() {
        return this.sender.check();
    }

    public void close() throws IOException {
        flush();
        if (this.closed.compareAndSet(false, true)) {
            this.close = new CountDownLatch(this.messageTimeoutNanos > 0 ? this.flushThreads : 0);
            try {
                if (!this.close.await(this.messageTimeoutNanos * 2, TimeUnit.NANOSECONDS)) {
                    logger.warn("Timed out waiting for close");
                }
            } catch (InterruptedException e) {
                logger.warn("Interrupted waiting for close");
                Thread.currentThread().interrupt();
            }
            if (this.scheduler != null) {
                clearTimers();
                this.scheduler.shutdown();
            }
            this.sender.close();
            int clearPendings = clearPendings();
            if (clearPendings > 0) {
                logger.warn("Dropped " + clearPendings + " messages due to AsyncReporter.close()");
            }
        }
    }

    int clearPendings() {
        int i = 0;
        for (AbstractSizeBoundedQueue abstractSizeBoundedQueue : this.queueManager.elements()) {
            i += abstractSizeBoundedQueue.clear();
            this.metrics.removeFromQueuedMessages(abstractSizeBoundedQueue.key);
        }
        this.queueManager.clear();
        if (i > 0) {
            this.metrics.incrementMessagesDropped(i);
        }
        return i;
    }
}
