package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.TieredStorageTestUtils;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/SortBufferAccumulatorTest.class */
class SortBufferAccumulatorTest {
    private static final int NUM_TOTAL_BUFFERS = 1000;
    private static final int BUFFER_SIZE_BYTES = 1024;
    private static final float NUM_BUFFERS_TRIGGER_FLUSH_RATIO = 0.6f;
    private NetworkBufferPool globalPool;

    SortBufferAccumulatorTest() {
    }

    @BeforeEach
    void before() {
        this.globalPool = new NetworkBufferPool(1000, 1024);
    }

    @AfterEach
    void after() {
        this.globalPool.destroy();
    }

    @Test
    void testAccumulateRecordsAndGenerateBuffers() throws IOException {
        testAccumulateRecordsAndGenerateBuffers(true, Arrays.asList(Buffer.DataType.DATA_BUFFER, Buffer.DataType.DATA_BUFFER_WITH_CLEAR_END));
    }

    @Test
    void testAccumulateRecordsAndGenerateBuffersWithPartialRecordUnallowed() throws IOException {
        testAccumulateRecordsAndGenerateBuffers(false, Collections.singletonList(Buffer.DataType.DATA_BUFFER_WITH_CLEAR_END));
    }

    private void testAccumulateRecordsAndGenerateBuffers(boolean z, Collection<Buffer.DataType> collection) throws IOException {
        TieredStorageSubpartitionId tieredStorageSubpartitionId = new TieredStorageSubpartitionId(0);
        Random random = new Random(1234L);
        TieredStorageMemoryManagerImpl createStorageMemoryManager = createStorageMemoryManager(10);
        int i = 0;
        int i2 = 0;
        AtomicInteger atomicInteger = new AtomicInteger(0);
        SortBufferAccumulator sortBufferAccumulator = new SortBufferAccumulator(1, 2, 1024, createStorageMemoryManager, z);
        Throwable th = null;
        try {
            try {
                sortBufferAccumulator.setup((tieredStorageSubpartitionId2, buffer, num) -> {
                    Assertions.assertThat(buffer.getDataType()).isIn(collection);
                    atomicInteger.incrementAndGet();
                    buffer.recycleBuffer();
                });
                boolean z2 = false;
                for (int i3 = 0; i3 < 1000; i3++) {
                    int nextInt = random.nextInt(1024) + 1;
                    ByteBuffer generateRandomData = TieredStorageTestUtils.generateRandomData(nextInt, random);
                    boolean nextBoolean = random.nextBoolean();
                    sortBufferAccumulator.receive(generateRandomData, tieredStorageSubpartitionId, Buffer.DataType.DATA_BUFFER, nextBoolean);
                    if (i2 + nextInt + 16 > 1024 || (i3 > 0 && z2 != nextBoolean)) {
                        i++;
                        i2 = 0;
                    }
                    z2 = nextBoolean;
                    i2 += nextInt + 16;
                }
                if (sortBufferAccumulator != null) {
                    if (0 != 0) {
                        try {
                            sortBufferAccumulator.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        sortBufferAccumulator.close();
                    }
                }
                Assertions.assertThat(i2).isLessThan(1024);
                Assertions.assertThat(atomicInteger).hasValue(i + (i2 == 0 ? 0 : 1));
            } finally {
            }
        } catch (Throwable th3) {
            if (sortBufferAccumulator != null) {
                if (th != null) {
                    try {
                        sortBufferAccumulator.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    sortBufferAccumulator.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testWriteLargeRecord() throws IOException {
        testWriteLargeRecord(true);
    }

    @Test
    void testWriteLargeRecordWithPartialRecordUnallowed() throws IOException {
        testWriteLargeRecord(false);
    }

    private void testWriteLargeRecord(boolean z) throws IOException {
        Random random = new Random();
        SortBufferAccumulator sortBufferAccumulator = new SortBufferAccumulator(1, 2, 1024, createStorageMemoryManager(15), z);
        Throwable th = null;
        try {
            try {
                AtomicInteger atomicInteger = new AtomicInteger(0);
                sortBufferAccumulator.setup((tieredStorageSubpartitionId, buffer, num) -> {
                    atomicInteger.getAndAdd(1);
                    buffer.recycleBuffer();
                });
                sortBufferAccumulator.receive(TieredStorageTestUtils.generateRandomData(1024 * 15, random), new TieredStorageSubpartitionId(0), Buffer.DataType.DATA_BUFFER, false);
                Assertions.assertThat(atomicInteger).hasValue(15);
                if (sortBufferAccumulator != null) {
                    if (0 == 0) {
                        sortBufferAccumulator.close();
                        return;
                    }
                    try {
                        sortBufferAccumulator.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (sortBufferAccumulator != null) {
                if (th != null) {
                    try {
                        sortBufferAccumulator.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    sortBufferAccumulator.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void testNoBuffersForSort() throws IOException {
        Random random = new Random(1111L);
        TieredStorageSubpartitionId tieredStorageSubpartitionId = new TieredStorageSubpartitionId(0);
        SortBufferAccumulator sortBufferAccumulator = new SortBufferAccumulator(1, 1, 1024, createStorageMemoryManager(10), true);
        Throwable th = null;
        try {
            sortBufferAccumulator.setup((tieredStorageSubpartitionId2, buffer, num) -> {
            });
            Assertions.assertThatThrownBy(() -> {
                sortBufferAccumulator.receive(TieredStorageTestUtils.generateRandomData(1, random), tieredStorageSubpartitionId, Buffer.DataType.DATA_BUFFER, false);
            }).isInstanceOf(IllegalArgumentException.class);
            if (sortBufferAccumulator != null) {
                if (0 == 0) {
                    sortBufferAccumulator.close();
                    return;
                }
                try {
                    sortBufferAccumulator.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (sortBufferAccumulator != null) {
                if (0 != 0) {
                    try {
                        sortBufferAccumulator.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    sortBufferAccumulator.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testCloseWithUnFinishedBuffers() throws IOException {
        TieredStorageMemoryManagerImpl createStorageMemoryManager = createStorageMemoryManager(10);
        SortBufferAccumulator sortBufferAccumulator = new SortBufferAccumulator(1, 2, 1024, createStorageMemoryManager, true);
        sortBufferAccumulator.setup((tieredStorageSubpartitionId, buffer, num) -> {
            buffer.recycleBuffer();
        });
        sortBufferAccumulator.receive(TieredStorageTestUtils.generateRandomData(1, new Random()), new TieredStorageSubpartitionId(0), Buffer.DataType.DATA_BUFFER, false);
        Assertions.assertThat(createStorageMemoryManager.numOwnerRequestedBuffer(sortBufferAccumulator)).isEqualTo(2);
        sortBufferAccumulator.close();
        Assertions.assertThat(createStorageMemoryManager.numOwnerRequestedBuffer(sortBufferAccumulator)).isZero();
    }

    private TieredStorageMemoryManagerImpl createStorageMemoryManager(int i) throws IOException {
        BufferPool createBufferPool = this.globalPool.createBufferPool(i, i);
        TieredStorageMemoryManagerImpl tieredStorageMemoryManagerImpl = new TieredStorageMemoryManagerImpl(0.6f, true);
        tieredStorageMemoryManagerImpl.setup(createBufferPool, Collections.singletonList(new TieredStorageMemorySpec(this, 1)));
        return tieredStorageMemoryManagerImpl;
    }
}
