package io.confluent.kafka.availability;

import io.confluent.kafka.availability.ThreadCountersManager;
import io.confluent.kafka.availability.ThreadLocalCounters;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

/* loaded from: input_file:io/confluent/kafka/availability/ThreadCountersManagerTest.class */
public class ThreadCountersManagerTest {
    @BeforeAll
    public static void setup() {
        ThreadCountersManager.threadCountersManagerEnabled = true;
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testNumSuccessOps(boolean z) throws InterruptedException {
        Thread thread = new Thread(() -> {
            ThreadCountersManager.LocalCounters.setThreadType(ThreadLocalCounters.ThreadGroupType.REQUEST_HANDLER_THREAD);
            for (int i = 0; i < 10; i++) {
                if (z) {
                    ThreadCountersManager.wrapIOVoid(() -> {
                        try {
                            Thread.sleep(1L);
                        } catch (InterruptedException e) {
                        }
                    });
                } else {
                    ThreadCountersManager.wrapEngine(() -> {
                        try {
                            Thread.sleep(10L);
                            return null;
                        } catch (InterruptedException e) {
                            return null;
                        }
                    });
                }
            }
        });
        thread.start();
        thread.join();
        Map threadLocalCounterSnapshotForAllKnownThreads = ThreadCountersManager.getThreadLocalCounterSnapshotForAllKnownThreads();
        Assertions.assertTrue(threadLocalCounterSnapshotForAllKnownThreads.containsKey(thread));
        ((List) threadLocalCounterSnapshotForAllKnownThreads.get(thread)).forEach(threadLocalCounters -> {
            Assertions.assertEquals(ThreadLocalCounters.ThreadGroupType.REQUEST_HANDLER_THREAD, threadLocalCounters.threadGroupType);
            if (!(z && threadLocalCounters.metricComponentType == ThreadLocalCounters.MetricComponentType.STORAGE) && (z || threadLocalCounters.metricComponentType != ThreadLocalCounters.MetricComponentType.ENGINE)) {
                Assertions.assertEquals(0L, threadLocalCounters.numSuccessOps);
            } else {
                Assertions.assertEquals(10L, threadLocalCounters.numSuccessOps);
            }
        });
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testNumInProgressOps(boolean z) throws InterruptedException {
        ReentrantLock reentrantLock = new ReentrantLock();
        Condition newCondition = reentrantLock.newCondition();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Thread thread = z ? new Thread(getRunnableThreadWaitingForBothExecutionAndExit(reentrantLock, newCondition, atomicBoolean, ThreadLocalCounters.ThreadGroupType.REQUEST_HANDLER_THREAD)) : new Thread(getRunnableEngineThreadWaitingForBothExecutionAndExit(reentrantLock, newCondition, atomicBoolean, ThreadLocalCounters.ThreadGroupType.REQUEST_HANDLER_THREAD));
        thread.start();
        atomicBoolean.getClass();
        TestUtils.waitForCondition(atomicBoolean::get, "ioControl should be true");
        Map threadLocalCounterSnapshotForAllKnownThreads = ThreadCountersManager.getThreadLocalCounterSnapshotForAllKnownThreads();
        Assertions.assertTrue(threadLocalCounterSnapshotForAllKnownThreads.containsKey(thread));
        ((List) threadLocalCounterSnapshotForAllKnownThreads.get(thread)).forEach(threadLocalCounters -> {
            Assertions.assertEquals(ThreadLocalCounters.ThreadGroupType.REQUEST_HANDLER_THREAD, threadLocalCounters.threadGroupType);
            if (!(z && threadLocalCounters.metricComponentType == ThreadLocalCounters.MetricComponentType.STORAGE) && (z || threadLocalCounters.metricComponentType != ThreadLocalCounters.MetricComponentType.ENGINE)) {
                Assertions.assertEquals(0L, threadLocalCounters.numSuccessOps);
                Assertions.assertEquals(0L, threadLocalCounters.numInProgressOps);
            } else {
                Assertions.assertEquals(0L, threadLocalCounters.numSuccessOps);
                Assertions.assertEquals(1L, threadLocalCounters.numInProgressOps);
            }
        });
        reentrantLock.lock();
        newCondition.signal();
        atomicBoolean.set(false);
        reentrantLock.unlock();
        thread.join();
        ((List) ThreadCountersManager.getThreadLocalCounterSnapshotForAllKnownThreads().get(thread)).forEach(threadLocalCounters2 -> {
            Assertions.assertEquals(ThreadLocalCounters.ThreadGroupType.REQUEST_HANDLER_THREAD, threadLocalCounters2.threadGroupType);
            if (!(z && threadLocalCounters2.metricComponentType == ThreadLocalCounters.MetricComponentType.STORAGE) && (z || threadLocalCounters2.metricComponentType != ThreadLocalCounters.MetricComponentType.ENGINE)) {
                Assertions.assertEquals(0L, threadLocalCounters2.numSuccessOps);
            } else {
                Assertions.assertEquals(10L, threadLocalCounters2.numSuccessOps);
            }
            Assertions.assertEquals(0L, threadLocalCounters2.numInProgressOps);
        });
    }

    private Runnable getRunnableThreadWaitingForBothExecutionAndExit(Lock lock, Condition condition, AtomicBoolean atomicBoolean, ThreadLocalCounters.ThreadGroupType threadGroupType) {
        return () -> {
            ThreadCountersManager.LocalCounters.setThreadType(threadGroupType);
            for (int i = 0; i < 10; i++) {
                int i2 = i;
                ThreadCountersManager.wrapIO(() -> {
                    atomicBoolean.set(true);
                    if (i2 == 0) {
                        lock.lock();
                        while (atomicBoolean.get()) {
                            try {
                                condition.await();
                            } catch (InterruptedException e) {
                                lock.unlock();
                            } catch (Throwable th) {
                                lock.unlock();
                                throw th;
                            }
                        }
                        Thread.sleep(1L);
                        lock.unlock();
                    }
                    return 0;
                });
            }
        };
    }

    private Runnable getRunnableEngineThreadWaitingForBothExecutionAndExit(Lock lock, Condition condition, AtomicBoolean atomicBoolean, ThreadLocalCounters.ThreadGroupType threadGroupType) {
        return getRunnableEngineThreadWaitingForBothExecutionAndExit(lock, condition, atomicBoolean, threadGroupType, 10);
    }

    private Runnable getRunnableEngineThreadWaitingForBothExecutionAndExit(Lock lock, Condition condition, AtomicBoolean atomicBoolean, ThreadLocalCounters.ThreadGroupType threadGroupType, int i) {
        return () -> {
            ThreadCountersManager.LocalCounters.setThreadType(threadGroupType);
            for (int i2 = 0; i2 < i; i2++) {
                int i3 = i2;
                ThreadCountersManager.wrapEngine(() -> {
                    atomicBoolean.set(true);
                    if (i3 == 0) {
                        lock.lock();
                        while (atomicBoolean.get()) {
                            try {
                                try {
                                    condition.await();
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                    lock.unlock();
                                }
                            } catch (Throwable th) {
                                lock.unlock();
                                throw th;
                            }
                        }
                        Thread.sleep(10L);
                        lock.unlock();
                    }
                    return 0;
                });
            }
        };
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testMultipleThreadsForInProgress(boolean z) throws InterruptedException {
        ReentrantLock reentrantLock = new ReentrantLock();
        Condition newCondition = reentrantLock.newCondition();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        ThreadLocalCounters.ThreadGroupType threadGroupType = ThreadLocalCounters.ThreadGroupType.REQUEST_HANDLER_THREAD;
        Thread thread = new Thread(getRunnableEngineThreadWaitingForBothExecutionAndExit(reentrantLock, newCondition, atomicBoolean, threadGroupType, 12));
        ReentrantLock reentrantLock2 = new ReentrantLock();
        Condition newCondition2 = reentrantLock2.newCondition();
        AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        Thread thread2 = new Thread(getRunnableThreadWaitingForBothExecutionAndExit(reentrantLock2, newCondition2, atomicBoolean2, z ? threadGroupType : ThreadLocalCounters.ThreadGroupType.BACKGROUND_THREAD));
        thread.start();
        thread2.start();
        atomicBoolean.getClass();
        TestUtils.waitForCondition(atomicBoolean::get, "ioControl should be true");
        atomicBoolean2.getClass();
        TestUtils.waitForCondition(atomicBoolean2::get, "ioControl2 should be true");
        Map threadLocalCounterSnapshotForAllKnownThreads = ThreadCountersManager.getThreadLocalCounterSnapshotForAllKnownThreads();
        ((List) threadLocalCounterSnapshotForAllKnownThreads.get(thread)).forEach(threadLocalCounters -> {
            if (threadLocalCounters.metricComponentType == ThreadLocalCounters.MetricComponentType.ENGINE) {
                Assertions.assertEquals(1L, threadLocalCounters.numInProgressOps);
                Assertions.assertEquals(0L, threadLocalCounters.numSuccessOps);
            }
        });
        ((List) threadLocalCounterSnapshotForAllKnownThreads.get(thread2)).forEach(threadLocalCounters2 -> {
            if (threadLocalCounters2.metricComponentType == ThreadLocalCounters.MetricComponentType.STORAGE) {
                Assertions.assertEquals(1L, threadLocalCounters2.numInProgressOps);
                Assertions.assertEquals(0L, threadLocalCounters2.numSuccessOps);
            }
        });
        reentrantLock.lock();
        newCondition.signal();
        atomicBoolean.set(false);
        reentrantLock.unlock();
        reentrantLock2.lock();
        newCondition2.signal();
        atomicBoolean2.set(false);
        reentrantLock2.unlock();
        thread.join();
        thread2.join();
        Map threadLocalCounterSnapshotForAllKnownThreads2 = ThreadCountersManager.getThreadLocalCounterSnapshotForAllKnownThreads();
        ((List) threadLocalCounterSnapshotForAllKnownThreads2.get(thread)).forEach(threadLocalCounters3 -> {
            if (threadLocalCounters3.metricComponentType == ThreadLocalCounters.MetricComponentType.ENGINE) {
                Assertions.assertEquals(0L, threadLocalCounters3.numInProgressOps);
                Assertions.assertEquals(12L, threadLocalCounters3.numSuccessOps);
            }
        });
        ((List) threadLocalCounterSnapshotForAllKnownThreads2.get(thread2)).forEach(threadLocalCounters4 -> {
            if (threadLocalCounters4.metricComponentType == ThreadLocalCounters.MetricComponentType.STORAGE) {
                Assertions.assertEquals(0L, threadLocalCounters4.numInProgressOps);
                Assertions.assertEquals(10L, threadLocalCounters4.numSuccessOps);
            }
        });
    }

    @Test
    public void testNestedInProgressIO() throws InterruptedException {
        ReentrantLock reentrantLock = new ReentrantLock();
        Condition newCondition = reentrantLock.newCondition();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Thread thread = new Thread(() -> {
            ThreadCountersManager.LocalCounters.setThreadType(ThreadLocalCounters.ThreadGroupType.NETWORK_THREAD);
            for (int i = 0; i < 10; i++) {
                int i2 = i;
                ThreadCountersManager.wrapIOVoid(() -> {
                    ThreadCountersManager.wrapIOVoid(() -> {
                        atomicBoolean.set(true);
                        if (i2 == 0) {
                            reentrantLock.lock();
                            while (atomicBoolean.get()) {
                                try {
                                    newCondition.await();
                                } catch (InterruptedException e) {
                                    reentrantLock.unlock();
                                    return;
                                } catch (Throwable th) {
                                    reentrantLock.unlock();
                                    throw th;
                                }
                            }
                            Thread.sleep(1L);
                            reentrantLock.unlock();
                        }
                    });
                });
            }
        });
        thread.start();
        atomicBoolean.getClass();
        TestUtils.waitForCondition(atomicBoolean::get, "ioControl should be true");
        ((List) ThreadCountersManager.getThreadLocalCounterSnapshotForAllKnownThreads().get(thread)).forEach(threadLocalCounters -> {
            if (threadLocalCounters.metricComponentType == ThreadLocalCounters.MetricComponentType.STORAGE) {
                Assertions.assertEquals(2L, threadLocalCounters.numInProgressOps);
                Assertions.assertEquals(0L, threadLocalCounters.numSuccessOps);
            }
        });
        reentrantLock.lock();
        atomicBoolean.set(false);
        newCondition.signal();
        reentrantLock.unlock();
        thread.join();
        ((List) ThreadCountersManager.getThreadLocalCounterSnapshotForAllKnownThreads().get(thread)).forEach(threadLocalCounters2 -> {
            if (threadLocalCounters2.metricComponentType == ThreadLocalCounters.MetricComponentType.STORAGE) {
                Assertions.assertEquals(0L, threadLocalCounters2.numInProgressOps);
                Assertions.assertEquals(20L, threadLocalCounters2.numSuccessOps);
            }
        });
    }
}
