package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.TieredStorageTestUtils;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashBufferAccumulatorTest.class */
class HashBufferAccumulatorTest {
    public static final int NUM_TOTAL_BUFFERS = 1000;
    public static final int NETWORK_BUFFER_SIZE = 1024;
    private static final float NUM_BUFFERS_TRIGGER_FLUSH_RATIO = 0.6f;
    private NetworkBufferPool globalPool;

    HashBufferAccumulatorTest() {
    }

    @BeforeEach
    void before() {
        this.globalPool = new NetworkBufferPool(NUM_TOTAL_BUFFERS, NETWORK_BUFFER_SIZE);
    }

    @AfterEach
    void after() {
        this.globalPool.destroy();
    }

    @Test
    void testAccumulateRecordsAndGenerateFinishedBuffers() throws IOException {
        testAccumulateRecordsAndGenerateFinishedBuffers(true);
    }

    @Test
    void testAccumulateRecordsAndGenerateFinishedBuffersWithPartialRecordUnallowed() throws IOException {
        testAccumulateRecordsAndGenerateFinishedBuffers(false);
    }

    private void testAccumulateRecordsAndGenerateFinishedBuffers(boolean z) throws IOException {
        ByteBuffer serializedEvent;
        TieredStorageSubpartitionId tieredStorageSubpartitionId = new TieredStorageSubpartitionId(0);
        Random random = new Random();
        HashBufferAccumulator hashBufferAccumulator = new HashBufferAccumulator(1, NETWORK_BUFFER_SIZE, createStorageMemoryManager(10), z);
        try {
            AtomicInteger atomicInteger = new AtomicInteger(0);
            hashBufferAccumulator.setup((tieredStorageSubpartitionId2, buffer, num) -> {
                atomicInteger.incrementAndGet();
                buffer.recycleBuffer();
            });
            int i = 0;
            int i2 = 0;
            int i3 = 0;
            while (i3 < 1000) {
                boolean z2 = random.nextBoolean() && i3 != NUM_TOTAL_BUFFERS - 1;
                Buffer.DataType dataType = z2 ? Buffer.DataType.DATA_BUFFER : Buffer.DataType.EVENT_BUFFER;
                if (z2) {
                    int nextInt = random.nextInt(2048) + 1;
                    if (!z && i + nextInt > 1024 && i > 0) {
                        i2++;
                        i = 0;
                    }
                    if (z || nextInt <= 1024) {
                        i += nextInt;
                    } else {
                        i2 += (nextInt / NETWORK_BUFFER_SIZE) + (nextInt % NETWORK_BUFFER_SIZE == 0 ? 0 : 1);
                    }
                    serializedEvent = TieredStorageTestUtils.generateRandomData(nextInt, random);
                } else {
                    int i4 = i2 + (i / NETWORK_BUFFER_SIZE) + (i % NETWORK_BUFFER_SIZE == 0 ? 0 : 1);
                    serializedEvent = EventSerializer.toSerializedEvent(EndOfPartitionEvent.INSTANCE);
                    i2 = i4 + 1;
                    i = 0;
                }
                hashBufferAccumulator.receive(serializedEvent, tieredStorageSubpartitionId, dataType, false);
                i3++;
            }
            Assertions.assertThat(atomicInteger.get()).isEqualTo(i2);
            hashBufferAccumulator.close();
        } catch (Throwable th) {
            try {
                hashBufferAccumulator.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void testEventShouldNotRequestBufferFromMemoryManager() throws IOException {
        TieredStorageMemoryManagerImpl createStorageMemoryManager = createStorageMemoryManager(10);
        HashBufferAccumulator hashBufferAccumulator = new HashBufferAccumulator(1, NETWORK_BUFFER_SIZE, createStorageMemoryManager, true);
        try {
            hashBufferAccumulator.setup((tieredStorageSubpartitionId, buffer, num) -> {
                buffer.recycleBuffer();
            });
            hashBufferAccumulator.receive(EventSerializer.toSerializedEvent(EndOfPartitionEvent.INSTANCE), new TieredStorageSubpartitionId(0), Buffer.DataType.EVENT_BUFFER, false);
            Assertions.assertThat(createStorageMemoryManager.numOwnerRequestedBuffer(hashBufferAccumulator)).isZero();
            hashBufferAccumulator.close();
        } catch (Throwable th) {
            try {
                hashBufferAccumulator.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void testCloseWithUnFinishedBuffers() throws IOException {
        TieredStorageMemoryManagerImpl createStorageMemoryManager = createStorageMemoryManager(10);
        HashBufferAccumulator hashBufferAccumulator = new HashBufferAccumulator(1, NETWORK_BUFFER_SIZE, createStorageMemoryManager, true);
        hashBufferAccumulator.setup((tieredStorageSubpartitionId, buffer, num) -> {
            buffer.recycleBuffer();
        });
        hashBufferAccumulator.receive(TieredStorageTestUtils.generateRandomData(1, new Random()), new TieredStorageSubpartitionId(0), Buffer.DataType.DATA_BUFFER, false);
        Assertions.assertThat(createStorageMemoryManager.numOwnerRequestedBuffer(hashBufferAccumulator)).isEqualTo(1);
        hashBufferAccumulator.close();
        Assertions.assertThat(createStorageMemoryManager.numOwnerRequestedBuffer(this)).isZero();
    }

    private TieredStorageMemoryManagerImpl createStorageMemoryManager(int i) throws IOException {
        BufferPool createBufferPool = this.globalPool.createBufferPool(i, i);
        TieredStorageMemoryManagerImpl tieredStorageMemoryManagerImpl = new TieredStorageMemoryManagerImpl(NUM_BUFFERS_TRIGGER_FLUSH_RATIO, true);
        tieredStorageMemoryManagerImpl.setup(createBufferPool, Collections.singletonList(new TieredStorageMemorySpec(this, 1)));
        return tieredStorageMemoryManagerImpl;
    }
}
