package org.apache.storm.utils;

import java.io.PrintStream;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.storm.metrics2.StormMetricRegistry;
import org.apache.storm.policy.IWaitStrategy;
import org.apache.storm.policy.WaitStrategyPark;
import org.apache.storm.utils.JCQueue;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/storm/utils/JCQueueTest.class */
public class JCQueueTest {
    private static final int TIMEOUT = 5000;
    private static final int PRODUCER_NUM = 4;
    IWaitStrategy waitStrategy = new WaitStrategyPark(100);

    /* loaded from: input_file:org/apache/storm/utils/JCQueueTest$ConsumerThd.class */
    private static class ConsumerThd implements Runnable {
        private final JCQueue.Consumer handler;
        private final JCQueue queue;

        ConsumerThd(JCQueue jCQueue, JCQueue.Consumer consumer) {
            this.handler = consumer;
            this.queue = jCQueue;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                if (Thread.currentThread().isInterrupted() && this.queue.size() == 0) {
                    return;
                } else {
                    this.queue.consume(this.handler);
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/storm/utils/JCQueueTest$IncProducer.class */
    private static class IncProducer implements Runnable {
        private final JCQueue queue;
        private final long _max;
        private final long min;

        public IncProducer(JCQueue jCQueue, long j, long j2) {
            this.queue = jCQueue;
            this._max = j;
            this.min = j2;
        }

        @Override // java.lang.Runnable
        public void run() {
            for (long j = 0; j < this._max && (!Thread.currentThread().isInterrupted() || j < this.min); j++) {
                try {
                    this.queue.publish(Long.valueOf(j));
                } catch (InterruptedException e) {
                    return;
                }
            }
        }
    }

    @Test
    public void testFirstMessageFirst() {
        Assertions.assertTimeoutPreemptively(Duration.ofSeconds(10L), () -> {
            JCQueue createQueue = createQueue("firstMessageOrder", 16);
            createQueue.publish("FIRST");
            IncProducer incProducer = new IncProducer(createQueue, 100L, 1L);
            final AtomicReference atomicReference = new AtomicReference();
            run(incProducer, new ConsumerThd(createQueue, new JCQueue.Consumer() { // from class: org.apache.storm.utils.JCQueueTest.1
                private boolean head = true;

                public void accept(Object obj) {
                    if (this.head) {
                        this.head = false;
                        atomicReference.set(obj);
                    }
                }

                public void flush() {
                }
            }), createQueue);
            Assertions.assertEquals("FIRST", atomicReference.get(), "We expect to receive first published message first, but received " + atomicReference.get());
        });
    }

    @Test
    public void testInOrder() {
        Assertions.assertTimeoutPreemptively(Duration.ofSeconds(10L), () -> {
            final AtomicBoolean atomicBoolean = new AtomicBoolean(true);
            JCQueue createQueue = createQueue("consumerHang", 1024);
            run(new IncProducer(createQueue, 1048576L, 100L), new ConsumerThd(createQueue, new JCQueue.Consumer() { // from class: org.apache.storm.utils.JCQueueTest.2
                long _expected = 0;

                public void accept(Object obj) {
                    if (this._expected != ((Number) obj).longValue()) {
                        atomicBoolean.set(false);
                        PrintStream printStream = System.out;
                        printStream.println("Expected " + this._expected + " but got " + printStream);
                    }
                    this._expected++;
                }

                public void flush() {
                }
            }), createQueue, 1000, 1);
            Assertions.assertTrue(atomicBoolean.get(), "Messages delivered out of order");
        });
    }

    @Test
    public void testInOrderBatch() {
        Assertions.assertTimeoutPreemptively(Duration.ofSeconds(10L), () -> {
            final AtomicBoolean atomicBoolean = new AtomicBoolean(true);
            JCQueue createQueue = createQueue("consumerHang", 10, 1024);
            run(new IncProducer(createQueue, 1048576L, 100L), new ConsumerThd(createQueue, new JCQueue.Consumer() { // from class: org.apache.storm.utils.JCQueueTest.3
                long _expected = 0;

                public void accept(Object obj) {
                    if (this._expected != ((Number) obj).longValue()) {
                        atomicBoolean.set(false);
                        PrintStream printStream = System.out;
                        printStream.println("Expected " + this._expected + " but got " + printStream);
                    }
                    this._expected++;
                }

                public void flush() {
                }
            }), createQueue, 1000, 1);
            Assertions.assertTrue(atomicBoolean.get(), "Messages delivered out of order");
        });
    }

    private void run(Runnable runnable, Runnable runnable2, JCQueue jCQueue) throws InterruptedException {
        run(runnable, runnable2, jCQueue, 20, PRODUCER_NUM);
    }

    private void run(Runnable runnable, Runnable runnable2, JCQueue jCQueue, int i, int i2) throws InterruptedException {
        Thread[] threadArr = new Thread[i2];
        for (int i3 = 0; i3 < i2; i3++) {
            threadArr[i3] = new Thread(runnable);
            threadArr[i3].start();
        }
        Thread thread = new Thread(runnable2);
        thread.start();
        Thread.sleep(i);
        for (int i4 = 0; i4 < i2; i4++) {
            threadArr[i4].interrupt();
        }
        for (int i5 = 0; i5 < i2; i5++) {
            threadArr[i5].join(5000L);
            Assertions.assertFalse(threadArr[i5].isAlive(), "producer " + i5 + " is still alive");
        }
        jCQueue.close();
        thread.interrupt();
        thread.join(5000L);
        Assertions.assertFalse(thread.isAlive(), "consumer is still alive");
    }

    private JCQueue createQueue(String str, int i) {
        return createQueue(str, 1, i);
    }

    private JCQueue createQueue(String str, int i, int i2) {
        return new JCQueue(str, str, i2, 0, i, this.waitStrategy, "test", "test", Collections.singletonList(1000), 1000, new StormMetricRegistry());
    }
}
