package datadog.trace.common.writer.ddagent;

import datadog.common.exec.DaemonThreadFactory;
import datadog.trace.core.DDSpan;
import datadog.trace.core.monitor.HealthMetrics;
import datadog.trace.core.monitor.Monitoring;
import datadog.trace.core.monitor.Recording;
import datadog.trace.core.processor.TraceProcessor;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.MpscBlockingConsumerArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:datadog/trace/common/writer/ddagent/TraceProcessingWorker.class */
public class TraceProcessingWorker implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(TraceProcessingWorker.class);
    private final PrioritizationStrategy prioritizationStrategy;
    private final MpscBlockingConsumerArrayQueue<Object> primaryQueue;
    private final MpscBlockingConsumerArrayQueue<Object> secondaryQueue;
    private final TraceSerializingHandler serializingHandler;
    private final Thread serializerThread;
    private final int capacity;

    /* loaded from: input_file:datadog/trace/common/writer/ddagent/TraceProcessingWorker$TraceSerializingHandler.class */
    public static class TraceSerializingHandler implements Runnable, MessagePassingQueue.Consumer<Object> {
        private final MpscBlockingConsumerArrayQueue<Object> primaryQueue;
        private final MpscBlockingConsumerArrayQueue<Object> secondaryQueue;
        private final TraceProcessor processor;
        private final HealthMetrics healthMetrics;
        private final long ticksRequiredToFlush;
        private final boolean doTimeFlush;
        private final PayloadDispatcher payloadDispatcher;
        private long lastTicks;
        private final Recording dutyCycleTimer;

        public TraceSerializingHandler(MpscBlockingConsumerArrayQueue<Object> mpscBlockingConsumerArrayQueue, MpscBlockingConsumerArrayQueue<Object> mpscBlockingConsumerArrayQueue2, HealthMetrics healthMetrics, Monitoring monitoring, TraceProcessor traceProcessor, PayloadDispatcher payloadDispatcher, long j, TimeUnit timeUnit) {
            this.primaryQueue = mpscBlockingConsumerArrayQueue;
            this.secondaryQueue = mpscBlockingConsumerArrayQueue2;
            this.healthMetrics = healthMetrics;
            this.dutyCycleTimer = monitoring.newCPUTimer("tracer.duty.cycle");
            this.processor = traceProcessor;
            this.doTimeFlush = j > 0;
            this.payloadDispatcher = payloadDispatcher;
            if (!this.doTimeFlush) {
                this.ticksRequiredToFlush = Long.MAX_VALUE;
            } else {
                this.lastTicks = System.nanoTime();
                this.ticksRequiredToFlush = timeUnit.toNanos(j);
            }
        }

        public void onEvent(Object obj) {
            try {
                if (obj instanceof List) {
                    this.payloadDispatcher.addTrace(this.processor.onTraceComplete((List) obj));
                } else if (obj instanceof FlushEvent) {
                    this.payloadDispatcher.flush();
                    ((FlushEvent) obj).sync();
                }
            } catch (Throwable th) {
                if (TraceProcessingWorker.log.isDebugEnabled()) {
                    TraceProcessingWorker.log.debug("Error while serializing trace", th);
                }
                this.healthMetrics.onFailedSerialize(obj instanceof List ? (List) obj : null, th);
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                runDutyCycle();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            TraceProcessingWorker.log.info("datadog trace processor exited");
        }

        private void runDutyCycle() throws InterruptedException {
            Thread currentThread = Thread.currentThread();
            this.dutyCycleTimer.start();
            while (!currentThread.isInterrupted()) {
                consumeFromPrimaryQueue();
                consumeFromSecondaryQueue();
                flushIfNecessary();
                this.dutyCycleTimer.reset();
            }
            this.dutyCycleTimer.stop();
        }

        private void consumeFromPrimaryQueue() throws InterruptedException {
            Object poll = this.primaryQueue.poll(100L, TimeUnit.MILLISECONDS);
            if (null != poll) {
                onEvent(poll);
                consumeBatch(this.primaryQueue);
            }
        }

        private void consumeFromSecondaryQueue() {
            Object poll = this.secondaryQueue.poll();
            if (null != poll) {
                onEvent(poll);
                consumeBatch(this.secondaryQueue);
            }
        }

        private void flushIfNecessary() {
            if (shouldFlush()) {
                this.payloadDispatcher.flush();
            }
        }

        private boolean shouldFlush() {
            if (!this.doTimeFlush) {
                return false;
            }
            long nanoTime = System.nanoTime();
            if (nanoTime - this.lastTicks <= this.ticksRequiredToFlush) {
                return false;
            }
            this.lastTicks = nanoTime;
            return true;
        }

        private void consumeBatch(MessagePassingQueue<Object> messagePassingQueue) {
            messagePassingQueue.drain(this, messagePassingQueue.size());
        }

        public void accept(Object obj) {
            onEvent(obj);
        }
    }

    public TraceProcessingWorker(int i, HealthMetrics healthMetrics, Monitoring monitoring, PayloadDispatcher payloadDispatcher, Prioritization prioritization, long j, TimeUnit timeUnit) {
        this(i, healthMetrics, monitoring, payloadDispatcher, new TraceProcessor(), prioritization, j, timeUnit);
    }

    public TraceProcessingWorker(int i, HealthMetrics healthMetrics, Monitoring monitoring, PayloadDispatcher payloadDispatcher, TraceProcessor traceProcessor, Prioritization prioritization, long j, TimeUnit timeUnit) {
        this.capacity = i;
        this.primaryQueue = createQueue(i);
        this.secondaryQueue = createQueue(i);
        this.prioritizationStrategy = prioritization.create(this.primaryQueue, this.secondaryQueue);
        this.serializingHandler = new TraceSerializingHandler(this.primaryQueue, this.secondaryQueue, healthMetrics, monitoring, traceProcessor, payloadDispatcher, j, timeUnit);
        this.serializerThread = DaemonThreadFactory.TRACE_PROCESSOR.newThread(this.serializingHandler);
    }

    public void start() {
        this.serializerThread.start();
    }

    /* JADX WARN: Code restructure failed: missing block: B:10:0x002f, code lost:
    
        java.lang.Thread.currentThread().interrupt();
     */
    /* JADX WARN: Code restructure failed: missing block: B:11:0x0036, code lost:
    
        return false;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public boolean flush(long r6, java.util.concurrent.TimeUnit r8) {
        /*
            r5 = this;
            java.util.concurrent.CountDownLatch r0 = new java.util.concurrent.CountDownLatch
            r1 = r0
            r2 = 1
            r1.<init>(r2)
            r9 = r0
            datadog.trace.common.writer.ddagent.FlushEvent r0 = new datadog.trace.common.writer.ddagent.FlushEvent
            r1 = r0
            r2 = r9
            r1.<init>(r2)
            r10 = r0
        L15:
            r0 = r5
            org.jctools.queues.MpscBlockingConsumerArrayQueue<java.lang.Object> r0 = r0.primaryQueue
            r1 = r10
            boolean r0 = r0.offer(r1)
            r11 = r0
            r0 = r11
            if (r0 == 0) goto L15
            r0 = r9
            r1 = r6
            r2 = r8
            boolean r0 = r0.await(r1, r2)     // Catch: java.lang.InterruptedException -> L2d
            return r0
        L2d:
            r12 = move-exception
            java.lang.Thread r0 = java.lang.Thread.currentThread()
            r0.interrupt()
            r0 = 0
            return r0
        */
        throw new UnsupportedOperationException("Method not decompiled: datadog.trace.common.writer.ddagent.TraceProcessingWorker.flush(long, java.util.concurrent.TimeUnit):boolean");
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.serializerThread.interrupt();
    }

    public boolean publish(int i, List<DDSpan> list) {
        return this.prioritizationStrategy.publish(i, list);
    }

    public int getCapacity() {
        return this.capacity;
    }

    public long getRemainingCapacity() {
        return this.primaryQueue.remainingCapacity();
    }

    private static MpscBlockingConsumerArrayQueue<Object> createQueue(int i) {
        return new MpscBlockingConsumerArrayQueue<>(i);
    }
}
