/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;

import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.BufferWithSubpartition;
import org.apache.flink.runtime.io.network.partition.SortBuffer;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageSortBuffer;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class TieredStorageSortBufferTest {
    private static final int BUFFER_SIZE_BYTES = 1024;

    TieredStorageSortBufferTest() {
    }

    @Test
    void testWriteAndReadDataBuffer() throws Exception {
        int numSubpartitions = 10;
        int bufferPoolSize = 512;
        Random random = new Random(1234L);
        Queue[] dataWritten = new Queue[numSubpartitions];
        Queue[] buffersRead = new Queue[numSubpartitions];
        for (int i = 0; i < numSubpartitions; ++i) {
            dataWritten[i] = new ArrayDeque();
            buffersRead[i] = new ArrayDeque();
        }
        int[] numBytesWritten = new int[numSubpartitions];
        int[] numBytesRead = new int[numSubpartitions];
        Arrays.fill(numBytesWritten, 0);
        Arrays.fill(numBytesRead, 0);
        TieredStorageSortBuffer sortBuffer = TieredStorageSortBufferTest.createDataBuffer(bufferPoolSize, numSubpartitions);
        int numDataBuffers = 5;
        while (numDataBuffers > 0) {
            BufferWithSubpartition buffer;
            int recordSize = random.nextInt(4095) + 1;
            byte[] bytes = new byte[recordSize];
            random.nextBytes(bytes);
            ByteBuffer record = ByteBuffer.wrap(bytes);
            int subpartition = random.nextInt(numSubpartitions);
            boolean isBuffer = random.nextBoolean();
            Buffer.DataType dataType = isBuffer ? Buffer.DataType.DATA_BUFFER : Buffer.DataType.EVENT_BUFFER;
            boolean writeSuccess = sortBuffer.append(record, subpartition, dataType);
            record.flip();
            if (record.hasRemaining()) {
                dataWritten[subpartition].add(new DataAndType(record, dataType));
                int n = subpartition;
                numBytesWritten[n] = numBytesWritten[n] + record.remaining();
            }
            if (writeSuccess) continue;
            sortBuffer.finish();
            --numDataBuffers;
            while (sortBuffer.hasRemaining() && (buffer = TieredStorageSortBufferTest.copyIntoSegment((SortBuffer)sortBuffer)) != null) {
                TieredStorageSortBufferTest.addBufferRead(buffer, buffersRead, numBytesRead);
            }
            sortBuffer = TieredStorageSortBufferTest.createDataBuffer(bufferPoolSize, numSubpartitions);
        }
        if (sortBuffer.hasRemaining()) {
            sortBuffer.finish();
            while (sortBuffer.hasRemaining()) {
                TieredStorageSortBufferTest.addBufferRead(TieredStorageSortBufferTest.copyIntoSegment((SortBuffer)sortBuffer), buffersRead, numBytesRead);
            }
        }
        TieredStorageSortBufferTest.checkWriteReadResult(numSubpartitions, numBytesWritten, numBytesRead, dataWritten, buffersRead);
    }

    @Test
    void testBufferIsRecycledWhenSortBufferIsEmpty() throws Exception {
        int numSubpartitions = 10;
        int bufferPoolSize = 512;
        int numBuffersForSort = 20;
        NetworkBufferPool globalPool = new NetworkBufferPool(bufferPoolSize, 1024);
        BufferPool bufferPool = globalPool.createBufferPool(bufferPoolSize, bufferPoolSize);
        LinkedList<MemorySegment> segments = new LinkedList<MemorySegment>();
        for (int i = 0; i < numBuffersForSort; ++i) {
            segments.add(bufferPool.requestMemorySegmentBlocking());
        }
        TieredStorageSortBuffer sortBuffer = new TieredStorageSortBuffer(segments, (BufferRecycler)bufferPool, numSubpartitions, 1024, numBuffersForSort, true);
        MemorySegment memorySegment = (MemorySegment)segments.poll();
        sortBuffer.finish();
        Assertions.assertThat((Object)sortBuffer.getNextBuffer(memorySegment)).isNull();
        Assertions.assertThat((int)bufferPool.bestEffortGetNumOfUsedBuffers()).isEqualTo(numBuffersForSort);
    }

    @Test
    void testBufferIsRecycledWhenGetEvent() throws Exception {
        int numSubpartitions = 10;
        int bufferPoolSize = 512;
        int bufferSizeBytes = 1024;
        int numBuffersForSort = 20;
        int subpartitionId = 0;
        Random random = new Random(1234L);
        NetworkBufferPool globalPool = new NetworkBufferPool(bufferPoolSize, bufferSizeBytes);
        BufferPool bufferPool = globalPool.createBufferPool(bufferPoolSize, bufferPoolSize);
        LinkedList<MemorySegment> segments = new LinkedList<MemorySegment>();
        for (int i = 0; i < numBuffersForSort; ++i) {
            segments.add(bufferPool.requestMemorySegmentBlocking());
        }
        TieredStorageSortBuffer sortBuffer = new TieredStorageSortBuffer(segments, (BufferRecycler)bufferPool, numSubpartitions, bufferSizeBytes, numBuffersForSort, true);
        byte[] bytes = new byte[1];
        random.nextBytes(bytes);
        ByteBuffer dataRecord = ByteBuffer.wrap(bytes);
        sortBuffer.append(dataRecord, subpartitionId, Buffer.DataType.DATA_BUFFER);
        ByteBuffer eventRecord = ByteBuffer.wrap(bytes);
        sortBuffer.append(eventRecord, subpartitionId, Buffer.DataType.EVENT_BUFFER);
        sortBuffer.finish();
        MemorySegment memorySegment = bufferPool.requestMemorySegmentBlocking();
        BufferWithSubpartition bufferWithSubpartition = sortBuffer.getNextBuffer(memorySegment);
        Assertions.assertThat((boolean)bufferWithSubpartition.getBuffer().isBuffer()).isTrue();
        Assertions.assertThat((int)bufferWithSubpartition.getSubpartitionIndex()).isEqualTo(subpartitionId);
        bufferWithSubpartition.getBuffer().recycleBuffer();
        Assertions.assertThat((int)bufferPool.bestEffortGetNumOfUsedBuffers()).isEqualTo(numBuffersForSort);
        bufferWithSubpartition = sortBuffer.getNextBuffer(memorySegment);
        Assertions.assertThat((boolean)bufferWithSubpartition.getBuffer().isBuffer()).isFalse();
        Assertions.assertThat((int)bufferWithSubpartition.getSubpartitionIndex()).isEqualTo(subpartitionId);
        Assertions.assertThat((int)bufferPool.bestEffortGetNumOfUsedBuffers()).isEqualTo(numBuffersForSort);
    }

    private static BufferWithSubpartition copyIntoSegment(SortBuffer dataBuffer) {
        MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment((int)1024);
        return dataBuffer.getNextBuffer(segment);
    }

    private static void addBufferRead(BufferWithSubpartition bufferWithSubpartition, Queue<Buffer>[] buffersRead, int[] numBytesRead) {
        int subpartition = bufferWithSubpartition.getSubpartitionIndex();
        Buffer buffer = bufferWithSubpartition.getBuffer();
        buffersRead[subpartition].add((Buffer)new NetworkBuffer(buffer.getMemorySegment(), MemorySegment::free, buffer.getDataType(), buffer.getSize()));
        int n = subpartition;
        numBytesRead[n] = numBytesRead[n] + buffer.getSize();
    }

    /*
     * WARNING - void declaration
     */
    private static void checkWriteReadResult(int numSubpartitions, int[] numBytesWritten, int[] numBytesRead, Queue<DataAndType>[] dataWritten, Queue<Buffer>[] buffersRead) {
        for (int subpartitionIndex = 0; subpartitionIndex < numSubpartitions; ++subpartitionIndex) {
            void var10_10;
            Assertions.assertThat((int)numBytesWritten[subpartitionIndex]).isEqualTo(numBytesRead[subpartitionIndex]);
            ArrayList<DataAndType> eventsWritten = new ArrayList<DataAndType>();
            ArrayList<Buffer> eventsRead = new ArrayList<Buffer>();
            ByteBuffer subpartitionDataWritten = ByteBuffer.allocate(numBytesWritten[subpartitionIndex]);
            for (DataAndType dataAndType : dataWritten[subpartitionIndex]) {
                subpartitionDataWritten.put(dataAndType.data);
                dataAndType.data.rewind();
                if (!dataAndType.dataType.isEvent()) continue;
                eventsWritten.add(dataAndType);
            }
            ByteBuffer subpartitionDataRead = ByteBuffer.allocate(numBytesRead[subpartitionIndex]);
            for (Buffer buffer : buffersRead[subpartitionIndex]) {
                subpartitionDataRead.put(buffer.getNioBufferReadable());
                if (buffer.isBuffer()) continue;
                eventsRead.add(buffer);
            }
            subpartitionDataWritten.flip();
            subpartitionDataRead.flip();
            Assertions.assertThat((Comparable)subpartitionDataWritten).isEqualTo((Object)subpartitionDataRead);
            Assertions.assertThat((int)eventsWritten.size()).isEqualTo(eventsRead.size());
            boolean bl = false;
            while (var10_10 < eventsWritten.size()) {
                Assertions.assertThat((Comparable)((DataAndType)eventsWritten.get((int)var10_10)).dataType).isEqualTo((Object)((Buffer)eventsRead.get((int)var10_10)).getDataType());
                Assertions.assertThat((Comparable)((DataAndType)eventsWritten.get((int)var10_10)).data).isEqualTo((Object)((Buffer)eventsRead.get((int)var10_10)).getNioBufferReadable());
                ++var10_10;
            }
        }
    }

    private static TieredStorageSortBuffer createDataBuffer(int bufferPoolSize, int numSubpartitions) throws Exception {
        NetworkBufferPool globalPool = new NetworkBufferPool(bufferPoolSize, 1024);
        BufferPool bufferPool = globalPool.createBufferPool(bufferPoolSize, bufferPoolSize);
        LinkedList<MemorySegment> segments = new LinkedList<MemorySegment>();
        for (int i = 0; i < bufferPoolSize; ++i) {
            segments.add(bufferPool.requestMemorySegmentBlocking());
        }
        return new TieredStorageSortBuffer(segments, (BufferRecycler)bufferPool, numSubpartitions, 1024, bufferPoolSize, true);
    }

    public static class DataAndType {
        private final ByteBuffer data;
        private final Buffer.DataType dataType;

        DataAndType(ByteBuffer data, Buffer.DataType dataType) {
            this.data = data;
            this.dataType = dataType;
        }
    }
}

