/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.producer.internals;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Deque;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import org.apache.kafka.clients.producer.BufferExhaustedException;
import org.apache.kafka.clients.producer.internals.BufferPool;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

public class BufferPoolTest {
    private final MockTime time = new MockTime();
    private final Metrics metrics = new Metrics((Time)this.time);
    private final long maxBlockTimeMs = 10L;
    private final String metricGroup = "TestMetrics";

    @AfterEach
    public void teardown() {
        this.metrics.close();
    }

    @Test
    public void testSimple() throws Exception {
        long totalMemory = 65536L;
        int size = 1024;
        BufferPool pool = new BufferPool(totalMemory, size, this.metrics, (Time)this.time, "TestMetrics");
        ByteBuffer buffer = pool.allocate(size, 10L);
        Assertions.assertEquals((int)size, (int)buffer.limit(), (String)"Buffer size should equal requested size.");
        Assertions.assertEquals((long)(totalMemory - (long)size), (long)pool.unallocatedMemory(), (String)"Unallocated memory should have shrunk");
        Assertions.assertEquals((long)(totalMemory - (long)size), (long)pool.availableMemory(), (String)"Available memory should have shrunk");
        buffer.putInt(1);
        buffer.flip();
        pool.deallocate(buffer);
        Assertions.assertEquals((long)totalMemory, (long)pool.availableMemory(), (String)"All memory should be available");
        Assertions.assertEquals((long)(totalMemory - (long)size), (long)pool.unallocatedMemory(), (String)"But now some is on the free list");
        buffer = pool.allocate(size, 10L);
        Assertions.assertEquals((int)0, (int)buffer.position(), (String)"Recycled buffer should be cleared.");
        Assertions.assertEquals((int)buffer.capacity(), (int)buffer.limit(), (String)"Recycled buffer should be cleared.");
        pool.deallocate(buffer);
        Assertions.assertEquals((long)totalMemory, (long)pool.availableMemory(), (String)"All memory should be available");
        Assertions.assertEquals((long)(totalMemory - (long)size), (long)pool.unallocatedMemory(), (String)"Still a single buffer on the free list");
        buffer = pool.allocate(2 * size, 10L);
        pool.deallocate(buffer);
        Assertions.assertEquals((long)totalMemory, (long)pool.availableMemory(), (String)"All memory should be available");
        Assertions.assertEquals((long)(totalMemory - (long)size), (long)pool.unallocatedMemory(), (String)"Non-standard size didn't go to the free list.");
    }

    @Test
    public void testCantAllocateMoreMemoryThanWeHave() throws Exception {
        BufferPool pool = new BufferPool(1024L, 512, this.metrics, (Time)this.time, "TestMetrics");
        ByteBuffer buffer = pool.allocate(1024, 10L);
        Assertions.assertEquals((int)1024, (int)buffer.limit());
        pool.deallocate(buffer);
        Assertions.assertThrows(IllegalArgumentException.class, () -> pool.allocate(1025, 10L));
    }

    @Test
    public void testDelayedAllocation() throws Exception {
        BufferPool pool = new BufferPool(5120L, 1024, this.metrics, (Time)this.time, "TestMetrics");
        ByteBuffer buffer = pool.allocate(1024, 10L);
        CountDownLatch doDealloc = this.asyncDeallocate(pool, buffer);
        CountDownLatch allocation = this.asyncAllocate(pool, 5120);
        Assertions.assertEquals((long)1L, (long)allocation.getCount(), (String)"Allocation shouldn't have happened yet, waiting on memory.");
        doDealloc.countDown();
        Assertions.assertTrue((boolean)allocation.await(1L, TimeUnit.SECONDS), (String)"Allocation should succeed soon after de-allocation");
    }

    private CountDownLatch asyncDeallocate(BufferPool pool, ByteBuffer buffer) {
        CountDownLatch latch = new CountDownLatch(1);
        Thread thread = new Thread(() -> {
            try {
                latch.await();
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            pool.deallocate(buffer);
        });
        thread.start();
        return latch;
    }

    private void delayedDeallocate(BufferPool pool, ByteBuffer buffer, long delayMs) {
        Thread thread = new Thread(() -> {
            Time.SYSTEM.sleep(delayMs);
            pool.deallocate(buffer);
        });
        thread.start();
    }

    private CountDownLatch asyncAllocate(BufferPool pool, int size) {
        CountDownLatch completed = new CountDownLatch(1);
        Thread thread = new Thread(() -> {
            try {
                pool.allocate(size, 10L);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            finally {
                completed.countDown();
            }
        });
        thread.start();
        return completed;
    }

    @Test
    public void testBufferExhaustedExceptionIsThrown() throws Exception {
        BufferPool pool = new BufferPool(2L, 1, this.metrics, (Time)this.time, "TestMetrics");
        pool.allocate(1, 10L);
        Assertions.assertThrows(BufferExhaustedException.class, () -> pool.allocate(2, 10L));
    }

    @Test
    public void testBlockTimeout() throws Exception {
        BufferPool pool = new BufferPool(10L, 1, this.metrics, Time.SYSTEM, "TestMetrics");
        ByteBuffer buffer1 = pool.allocate(1, 10L);
        ByteBuffer buffer2 = pool.allocate(1, 10L);
        ByteBuffer buffer3 = pool.allocate(1, 10L);
        this.delayedDeallocate(pool, buffer1, 5L);
        this.delayedDeallocate(pool, buffer2, 10L);
        this.delayedDeallocate(pool, buffer3, 25L);
        long beginTimeMs = Time.SYSTEM.milliseconds();
        try {
            pool.allocate(10, 10L);
            Assertions.fail((String)"The buffer allocated more memory than its maximum value 10");
        }
        catch (BufferExhaustedException bufferExhaustedException) {
            // empty catch block
        }
        Assertions.assertTrue((pool.availableMemory() >= 7L && pool.availableMemory() <= 10L ? 1 : 0) != 0, (String)("available memory " + pool.availableMemory()));
        long durationMs = Time.SYSTEM.milliseconds() - beginTimeMs;
        Assertions.assertTrue((durationMs >= 10L ? 1 : 0) != 0, (String)"BufferExhaustedException should not throw before maxBlockTimeMs");
        Assertions.assertTrue((durationMs < 1010L ? 1 : 0) != 0, (String)"BufferExhaustedException should throw soon after maxBlockTimeMs");
    }

    @Test
    public void testCleanupMemoryAvailabilityWaiterOnBlockTimeout() throws Exception {
        BufferPool pool = new BufferPool(2L, 1, this.metrics, (Time)this.time, "TestMetrics");
        pool.allocate(1, 10L);
        try {
            pool.allocate(2, 10L);
            Assertions.fail((String)"The buffer allocated more memory than its maximum value 2");
        }
        catch (BufferExhaustedException bufferExhaustedException) {
            // empty catch block
        }
        Assertions.assertEquals((int)0, (int)pool.queued());
        Assertions.assertEquals((long)1L, (long)pool.availableMemory());
    }

    @Test
    public void testCleanupMemoryAvailabilityWaiterOnInterruption() throws Exception {
        BufferPool pool = new BufferPool(2L, 1, this.metrics, (Time)this.time, "TestMetrics");
        long blockTime = 5000L;
        pool.allocate(1, 10L);
        Thread t1 = new Thread(new BufferPoolAllocator(pool, blockTime));
        Thread t2 = new Thread(new BufferPoolAllocator(pool, blockTime));
        t1.start();
        Thread.sleep(500L);
        Deque waiters = pool.waiters();
        Condition c1 = (Condition)waiters.getFirst();
        t2.start();
        Thread.sleep(500L);
        t1.interrupt();
        Thread.sleep(500L);
        Condition c2 = (Condition)waiters.getLast();
        t2.interrupt();
        Assertions.assertNotEquals((Object)c1, (Object)c2);
        t1.join();
        t2.join();
        Assertions.assertEquals((int)pool.queued(), (int)0);
    }

    @Test
    public void testCleanupMemoryAvailabilityOnMetricsException() throws Exception {
        BufferPool bufferPool = (BufferPool)Mockito.spy((Object)new BufferPool(2L, 1, new Metrics(), (Time)this.time, "TestMetrics"));
        ((BufferPool)Mockito.doThrow((Throwable[])new Throwable[]{new OutOfMemoryError()}).when((Object)bufferPool)).recordWaitTime(ArgumentMatchers.anyLong());
        bufferPool.allocate(1, 0L);
        try {
            bufferPool.allocate(2, 1000L);
            Assertions.fail((String)"Expected oom.");
        }
        catch (OutOfMemoryError outOfMemoryError) {
            // empty catch block
        }
        Assertions.assertEquals((long)1L, (long)bufferPool.availableMemory());
        Assertions.assertEquals((int)0, (int)bufferPool.queued());
        Assertions.assertEquals((long)1L, (long)bufferPool.unallocatedMemory());
        bufferPool.allocate(1, 0L);
        ((BufferPool)Mockito.verify((Object)bufferPool)).recordWaitTime(ArgumentMatchers.anyLong());
    }

    @Test
    public void testStressfulSituation() throws Exception {
        int numThreads = 10;
        int iterations = 50000;
        int poolableSize = 1024;
        long totalMemory = numThreads / 2 * 1024;
        BufferPool pool = new BufferPool(totalMemory, 1024, this.metrics, (Time)this.time, "TestMetrics");
        ArrayList<StressTestThread> threads = new ArrayList<StressTestThread>();
        for (int i = 0; i < numThreads; ++i) {
            threads.add(new StressTestThread(pool, 50000));
        }
        for (StressTestThread thread : threads) {
            thread.start();
        }
        for (StressTestThread thread : threads) {
            thread.join();
        }
        for (StressTestThread thread : threads) {
            Assertions.assertTrue((boolean)thread.success.get(), (String)"Thread should have completed all iterations successfully.");
        }
        Assertions.assertEquals((long)totalMemory, (long)pool.availableMemory());
    }

    @Test
    public void testLargeAvailableMemory() throws Exception {
        long memory = 20000000000L;
        int poolableSize = 2000000000;
        final AtomicInteger freeSize = new AtomicInteger(0);
        BufferPool pool = new BufferPool(memory, poolableSize, this.metrics, this.time, "TestMetrics"){

            protected ByteBuffer allocateByteBuffer(int size) {
                return ByteBuffer.allocate(0);
            }

            protected int freeSize() {
                return freeSize.get();
            }
        };
        pool.allocate(poolableSize, 0L);
        Assertions.assertEquals((long)18000000000L, (long)pool.availableMemory());
        pool.allocate(poolableSize, 0L);
        Assertions.assertEquals((long)16000000000L, (long)pool.availableMemory());
        freeSize.incrementAndGet();
        Assertions.assertEquals((long)18000000000L, (long)pool.availableMemory());
        freeSize.incrementAndGet();
        Assertions.assertEquals((long)20000000000L, (long)pool.availableMemory());
    }

    @Test
    public void outOfMemoryOnAllocation() {
        BufferPool bufferPool = new BufferPool(1024L, 1024, this.metrics, this.time, "TestMetrics"){

            protected ByteBuffer allocateByteBuffer(int size) {
                throw new OutOfMemoryError();
            }
        };
        try {
            bufferPool.allocateByteBuffer(1024);
            Assertions.fail((String)"Should have thrown OutOfMemoryError");
        }
        catch (OutOfMemoryError outOfMemoryError) {
            // empty catch block
        }
        Assertions.assertEquals((long)bufferPool.availableMemory(), (long)1024L);
    }

    @Test
    public void testCloseAllocations() throws Exception {
        BufferPool pool = new BufferPool(10L, 1, this.metrics, Time.SYSTEM, "TestMetrics");
        ByteBuffer buffer = pool.allocate(1, 10L);
        pool.close();
        Assertions.assertThrows(KafkaException.class, () -> pool.allocate(1, 10L));
        pool.deallocate(buffer);
    }

    @Test
    public void testCloseNotifyWaiters() throws Exception {
        int numWorkers = 2;
        BufferPool pool = new BufferPool(1L, 1, this.metrics, Time.SYSTEM, "TestMetrics");
        ByteBuffer buffer = pool.allocate(1, Long.MAX_VALUE);
        ExecutorService executor = Executors.newFixedThreadPool(2);
        Callable<Void> work = () -> {
            Assertions.assertThrows(KafkaException.class, () -> pool.allocate(1, Long.MAX_VALUE));
            return null;
        };
        for (int i = 0; i < 2; ++i) {
            executor.submit(work);
        }
        TestUtils.waitForCondition(() -> pool.queued() == 2, "Awaiting 2 workers to be blocked on allocation");
        pool.close();
        TestUtils.waitForCondition(() -> pool.queued() == 0, "Awaiting 2 workers to be interrupted from allocation");
        pool.deallocate(buffer);
    }

    public static class StressTestThread
    extends Thread {
        private final int iterations;
        private final BufferPool pool;
        private final long maxBlockTimeMs = 20000L;
        public final AtomicBoolean success = new AtomicBoolean(false);

        public StressTestThread(BufferPool pool, int iterations) {
            this.iterations = iterations;
            this.pool = pool;
        }

        @Override
        public void run() {
            try {
                for (int i = 0; i < this.iterations; ++i) {
                    int size = TestUtils.RANDOM.nextBoolean() ? this.pool.poolableSize() : TestUtils.RANDOM.nextInt((int)this.pool.totalMemory());
                    ByteBuffer buffer = this.pool.allocate(size, 20000L);
                    this.pool.deallocate(buffer);
                }
                this.success.set(true);
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private static class BufferPoolAllocator
    implements Runnable {
        BufferPool pool;
        long maxBlockTimeMs;

        BufferPoolAllocator(BufferPool pool, long maxBlockTimeMs) {
            this.pool = pool;
            this.maxBlockTimeMs = maxBlockTimeMs;
        }

        @Override
        public void run() {
            try {
                this.pool.allocate(2, this.maxBlockTimeMs);
                Assertions.fail((String)"The buffer allocated more memory than its maximum value 2");
            }
            catch (BufferExhaustedException bufferExhaustedException) {
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }
}

