package org.apache.flink.runtime.io.disk;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.HashBufferAccumulatorTest;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

@Timeout(60)
/* loaded from: input_file:org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPoolTest.class */
class BatchShuffleReadBufferPoolTest {
    BatchShuffleReadBufferPoolTest() {
    }

    @Test
    void testIllegalTotalBytes() {
        Assertions.assertThatThrownBy(() -> {
            createBufferPool(0L, HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE);
        }).isInstanceOf(IllegalArgumentException.class);
    }

    @Test
    void testIllegalBufferSize() {
        Assertions.assertThatThrownBy(() -> {
            createBufferPool(33554432L, 0);
        }).isInstanceOf(IllegalArgumentException.class);
    }

    @Test
    void testLargeTotalBytes() {
        BatchShuffleReadBufferPool createBufferPool = createBufferPool(Long.MAX_VALUE, HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE);
        Assertions.assertThat(createBufferPool.getNumTotalBuffers()).isEqualTo(Integer.MAX_VALUE);
        createBufferPool.destroy();
    }

    @Test
    void testTotalBytesSmallerThanBufferSize() {
        Assertions.assertThatThrownBy(() -> {
            createBufferPool(4096L, 32768);
        }).isInstanceOf(IllegalArgumentException.class);
    }

    @Test
    void testBufferCalculation() {
        for (int i = 4096; i <= 33554432; i += HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE) {
            BatchShuffleReadBufferPool createBufferPool = createBufferPool(33554432L, i);
            Assertions.assertThat(createBufferPool.getTotalBytes()).isEqualTo(33554432L);
            Assertions.assertThat(createBufferPool.getNumTotalBuffers()).isEqualTo(33554432 / i);
            Assertions.assertThat(createBufferPool.getNumBuffersPerRequest()).isLessThanOrEqualTo(createBufferPool.getNumTotalBuffers());
            Assertions.assertThat(createBufferPool.getNumBuffersPerRequest()).isGreaterThan(0);
        }
    }

    @Test
    void testRequestBuffers() throws Exception {
        BatchShuffleReadBufferPool createBufferPool = createBufferPool();
        ArrayList arrayList = new ArrayList();
        try {
            arrayList.addAll(createBufferPool.requestBuffers());
            Assertions.assertThat(arrayList).hasSize(createBufferPool.getNumBuffersPerRequest());
        } finally {
            createBufferPool.recycle(arrayList);
            createBufferPool.destroy();
        }
    }

    @Test
    void testRecycle() throws Exception {
        BatchShuffleReadBufferPool createBufferPool = createBufferPool();
        createBufferPool.recycle(createBufferPool.requestBuffers());
        Assertions.assertThat(createBufferPool.getAvailableBuffers()).isEqualTo(createBufferPool.getNumTotalBuffers());
    }

    @Test
    void testBufferOperationTimestampUpdated() throws Exception {
        BatchShuffleReadBufferPool batchShuffleReadBufferPool = new BatchShuffleReadBufferPool(1024L, HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE);
        long lastBufferOperationTimestamp = batchShuffleReadBufferPool.getLastBufferOperationTimestamp();
        Thread.sleep(100L);
        List requestBuffers = batchShuffleReadBufferPool.requestBuffers();
        Assertions.assertThat(requestBuffers).hasSize(1);
        Assertions.assertThat(batchShuffleReadBufferPool.getLastBufferOperationTimestamp()).isGreaterThan(lastBufferOperationTimestamp);
        long lastBufferOperationTimestamp2 = batchShuffleReadBufferPool.getLastBufferOperationTimestamp();
        Thread.sleep(100L);
        batchShuffleReadBufferPool.recycle(requestBuffers);
        Assertions.assertThat(batchShuffleReadBufferPool.getLastBufferOperationTimestamp()).isGreaterThan(lastBufferOperationTimestamp2);
        List requestBuffers2 = batchShuffleReadBufferPool.requestBuffers();
        long lastBufferOperationTimestamp3 = batchShuffleReadBufferPool.getLastBufferOperationTimestamp();
        Thread.sleep(100L);
        Assertions.assertThat(batchShuffleReadBufferPool.requestBuffers()).isEmpty();
        Assertions.assertThat(batchShuffleReadBufferPool.getLastBufferOperationTimestamp()).isEqualTo(lastBufferOperationTimestamp3);
        batchShuffleReadBufferPool.recycle(requestBuffers2);
        batchShuffleReadBufferPool.destroy();
    }

    @Test
    void testBufferFulfilledByRecycledBuffers() throws Exception {
        final BatchShuffleReadBufferPool createBufferPool = createBufferPool();
        final ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        try {
            Object[] objArr = new Object[8];
            for (int i = 0; i < 8; i++) {
                objArr[i] = new Object();
                concurrentHashMap.put(objArr[i], createBufferPool.requestBuffers());
            }
            Assertions.assertThat(createBufferPool.getAvailableBuffers()).isZero();
            CheckedThread[] checkedThreadArr = new CheckedThread[2];
            for (int i2 = 0; i2 < 2; i2++) {
                checkedThreadArr[i2] = new CheckedThread() { // from class: org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPoolTest.1
                    public void go() throws Exception {
                        Object obj = new Object();
                        List list = null;
                        while (true) {
                            List list2 = list;
                            if (list2 != null && !list2.isEmpty()) {
                                concurrentHashMap.put(obj, list2);
                                return;
                            }
                            list = createBufferPool.requestBuffers();
                        }
                    }
                };
                checkedThreadArr[i2].start();
            }
            Iterator it = ((List) concurrentHashMap.remove(objArr[0])).iterator();
            while (it.hasNext()) {
                createBufferPool.recycle((MemorySegment) it.next());
            }
            createBufferPool.recycle((Collection) concurrentHashMap.remove(objArr[1]));
            for (CheckedThread checkedThread : checkedThreadArr) {
                checkedThread.sync();
            }
            Assertions.assertThat(createBufferPool.getAvailableBuffers()).isZero();
            Assertions.assertThat(concurrentHashMap).hasSize(8);
            Iterator it2 = concurrentHashMap.keySet().iterator();
            while (it2.hasNext()) {
                createBufferPool.recycle((Collection) concurrentHashMap.remove(it2.next()));
            }
            Assertions.assertThat(createBufferPool.getAvailableBuffers()).isEqualTo(createBufferPool.getNumTotalBuffers());
            createBufferPool.destroy();
        } catch (Throwable th) {
            Iterator it3 = concurrentHashMap.keySet().iterator();
            while (it3.hasNext()) {
                createBufferPool.recycle((Collection) concurrentHashMap.remove(it3.next()));
            }
            Assertions.assertThat(createBufferPool.getAvailableBuffers()).isEqualTo(createBufferPool.getNumTotalBuffers());
            createBufferPool.destroy();
            throw th;
        }
    }

    @Test
    void testMultipleThreadRequestAndRecycle() throws Exception {
        final BatchShuffleReadBufferPool createBufferPool = createBufferPool();
        try {
            CheckedThread[] checkedThreadArr = new CheckedThread[10];
            for (int i = 0; i < 10; i++) {
                checkedThreadArr[i] = new CheckedThread() { // from class: org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPoolTest.2
                    public void go() throws Exception {
                        for (int i2 = 0; i2 < 100; i2++) {
                            List requestBuffers = createBufferPool.requestBuffers();
                            Thread.sleep(10L);
                            if (i2 % 2 == 0) {
                                createBufferPool.recycle(requestBuffers);
                            } else {
                                Iterator it = requestBuffers.iterator();
                                while (it.hasNext()) {
                                    createBufferPool.recycle((MemorySegment) it.next());
                                }
                            }
                        }
                    }
                };
                checkedThreadArr[i].start();
            }
            for (CheckedThread checkedThread : checkedThreadArr) {
                checkedThread.sync();
            }
            Assertions.assertThat(createBufferPool.getAvailableBuffers()).isEqualTo(createBufferPool.getNumTotalBuffers());
            createBufferPool.destroy();
        } catch (Throwable th) {
            createBufferPool.destroy();
            throw th;
        }
    }

    @Test
    void testDestroy() throws Exception {
        BatchShuffleReadBufferPool createBufferPool = createBufferPool();
        createBufferPool.recycle(createBufferPool.requestBuffers());
        Assertions.assertThat(createBufferPool.isDestroyed()).isFalse();
        Assertions.assertThat(createBufferPool.getAvailableBuffers()).isEqualTo(createBufferPool.getNumTotalBuffers());
        Assertions.assertThat(createBufferPool.getAvailableBuffers()).isEqualTo(createBufferPool.getNumTotalBuffers() - createBufferPool.requestBuffers().size());
        createBufferPool.destroy();
        Assertions.assertThat(createBufferPool.isDestroyed()).isTrue();
        Assertions.assertThat(createBufferPool.getAvailableBuffers()).isZero();
    }

    @Test
    void testRequestBuffersAfterDestroyed() throws Exception {
        BatchShuffleReadBufferPool createBufferPool = createBufferPool();
        createBufferPool.requestBuffers();
        createBufferPool.destroy();
        Objects.requireNonNull(createBufferPool);
        Assertions.assertThatThrownBy(createBufferPool::requestBuffers).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testRecycleAfterDestroyed() throws Exception {
        BatchShuffleReadBufferPool createBufferPool = createBufferPool();
        List requestBuffers = createBufferPool.requestBuffers();
        createBufferPool.destroy();
        createBufferPool.recycle(requestBuffers);
        Assertions.assertThat(createBufferPool.getAvailableBuffers()).isZero();
    }

    @Test
    void testDestroyWhileBlockingRequest() throws Exception {
        final BatchShuffleReadBufferPool createBufferPool = createBufferPool();
        CheckedThread checkedThread = new CheckedThread() { // from class: org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPoolTest.3
            public void go() throws Exception {
                while (true) {
                    createBufferPool.requestBuffers();
                }
            }
        };
        checkedThread.start();
        Thread.sleep(1000L);
        createBufferPool.destroy();
        Objects.requireNonNull(checkedThread);
        Assertions.assertThatThrownBy(checkedThread::sync).isInstanceOf(IllegalStateException.class);
    }

    private BatchShuffleReadBufferPool createBufferPool(long j, int i) {
        return new BatchShuffleReadBufferPool(j, i);
    }

    private BatchShuffleReadBufferPool createBufferPool() {
        return createBufferPool(33554432L, 32768);
    }
}
