package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.Random;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.HashBufferAccumulatorTest;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith({ParameterizedTestExtension.class})
/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/DataBufferTest.class */
class DataBufferTest {
    private final boolean useHashBuffer;

    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/DataBufferTest$DataAndType.class */
    public static class DataAndType {
        private final ByteBuffer data;
        private final Buffer.DataType dataType;

        /* JADX INFO: Access modifiers changed from: package-private */
        public DataAndType(ByteBuffer byteBuffer, Buffer.DataType dataType) {
            this.data = byteBuffer;
            this.dataType = dataType;
        }
    }

    @Parameters(name = "UseHashBuffer = {0}")
    private static List<Boolean> parameters() {
        return Arrays.asList(true, false);
    }

    public DataBufferTest(boolean z) {
        this.useHashBuffer = z;
    }

    @TestTemplate
    void testWriteAndReadDataBuffer() throws Exception {
        BufferWithSubpartition copyIntoSegment;
        Random random = new Random(1111L);
        Queue[] queueArr = new Queue[10];
        Queue<Buffer>[] queueArr2 = new Queue[10];
        for (int i = 0; i < 10; i++) {
            queueArr[i] = new ArrayDeque();
            queueArr2[i] = new ArrayDeque();
        }
        int[] iArr = new int[10];
        int[] iArr2 = new int[10];
        Arrays.fill(iArr, 0);
        Arrays.fill(iArr2, 0);
        int i2 = 0;
        int[] randomSubpartitionOrder = getRandomSubpartitionOrder(10);
        DataBuffer createDataBuffer = createDataBuffer(512, HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE, 10, randomSubpartitionOrder);
        int i3 = 5;
        while (i3 > 0) {
            byte[] bArr = new byte[random.nextInt((HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE * 4) - 1) + 1];
            random.nextBytes(bArr);
            ByteBuffer wrap = ByteBuffer.wrap(bArr);
            int nextInt = random.nextInt(10);
            Buffer.DataType dataType = random.nextBoolean() ? Buffer.DataType.DATA_BUFFER : Buffer.DataType.EVENT_BUFFER;
            boolean append = createDataBuffer.append(wrap, nextInt, dataType);
            wrap.flip();
            if (wrap.hasRemaining()) {
                queueArr[nextInt].add(new DataAndType(wrap, dataType));
                iArr[nextInt] = iArr[nextInt] + wrap.remaining();
                i2 += wrap.remaining();
            }
            if (append) {
                createDataBuffer.finish();
                i3--;
                while (createDataBuffer.hasRemaining() && (copyIntoSegment = copyIntoSegment(HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE, createDataBuffer)) != null) {
                    addBufferRead(copyIntoSegment, queueArr2, iArr2);
                }
                createDataBuffer = createDataBuffer(512, HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE, 10, randomSubpartitionOrder);
            }
        }
        if (createDataBuffer.hasRemaining()) {
            Assertions.assertThat(createDataBuffer).isInstanceOf(HashBasedDataBuffer.class);
            createDataBuffer.finish();
            while (createDataBuffer.hasRemaining()) {
                addBufferRead(copyIntoSegment(HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE, createDataBuffer), queueArr2, iArr2);
            }
        }
        Assertions.assertThat(createDataBuffer.numTotalBytes()).isZero();
        checkWriteReadResult(10, iArr, iArr2, queueArr, queueArr2);
    }

    private BufferWithSubpartition copyIntoSegment(int i, DataBuffer dataBuffer) {
        if (!this.useHashBuffer) {
            return dataBuffer.getNextBuffer(MemorySegmentFactory.allocateUnpooledSegment(i));
        }
        BufferWithSubpartition nextBuffer = dataBuffer.getNextBuffer((MemorySegment) null);
        if (nextBuffer == null || !nextBuffer.getBuffer().isBuffer()) {
            return nextBuffer;
        }
        MemorySegment allocateUnpooledSegment = MemorySegmentFactory.allocateUnpooledSegment(i);
        int readableBytes = nextBuffer.getBuffer().readableBytes();
        allocateUnpooledSegment.put(0, nextBuffer.getBuffer().getNioBufferReadable(), readableBytes);
        nextBuffer.getBuffer().recycleBuffer();
        return new BufferWithSubpartition(new NetworkBuffer(allocateUnpooledSegment, (v0) -> {
            v0.free();
        }, Buffer.DataType.DATA_BUFFER, readableBytes), nextBuffer.getSubpartitionIndex());
    }

    private void addBufferRead(BufferWithSubpartition bufferWithSubpartition, Queue<Buffer>[] queueArr, int[] iArr) {
        int subpartitionIndex = bufferWithSubpartition.getSubpartitionIndex();
        queueArr[subpartitionIndex].add(bufferWithSubpartition.getBuffer());
        iArr[subpartitionIndex] = iArr[subpartitionIndex] + bufferWithSubpartition.getBuffer().readableBytes();
    }

    public static void checkWriteReadResult(int i, int[] iArr, int[] iArr2, Queue<DataAndType>[] queueArr, Queue<Buffer>[] queueArr2) {
        for (int i2 = 0; i2 < i; i2++) {
            Assertions.assertThat(iArr2[i2]).isEqualTo(iArr[i2]);
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            ByteBuffer allocate = ByteBuffer.allocate(iArr[i2]);
            for (DataAndType dataAndType : queueArr[i2]) {
                allocate.put(dataAndType.data);
                dataAndType.data.rewind();
                if (dataAndType.dataType.isEvent()) {
                    arrayList.add(dataAndType);
                }
            }
            ByteBuffer allocate2 = ByteBuffer.allocate(iArr2[i2]);
            for (Buffer buffer : queueArr2[i2]) {
                allocate2.put(buffer.getNioBufferReadable());
                if (!buffer.isBuffer()) {
                    arrayList2.add(buffer);
                }
            }
            allocate.flip();
            allocate2.flip();
            Assertions.assertThat(allocate2).isEqualTo(allocate);
            Assertions.assertThat(arrayList2).hasSameSizeAs(arrayList);
            for (int i3 = 0; i3 < arrayList.size(); i3++) {
                Assertions.assertThat(((Buffer) arrayList2.get(i3)).getDataType()).isEqualTo(((DataAndType) arrayList.get(i3)).dataType);
                Assertions.assertThat(((Buffer) arrayList2.get(i3)).getNioBufferReadable()).isEqualTo(((DataAndType) arrayList.get(i3)).data);
            }
        }
    }

    @TestTemplate
    public void testWriteReadWithEmptySubpartition() throws Exception {
        ByteBuffer[] byteBufferArr = {ByteBuffer.allocate(128), null, ByteBuffer.allocate(1536), null, ByteBuffer.allocate(HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE)};
        DataBuffer createDataBuffer = createDataBuffer(10, HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE, 5);
        for (int i = 0; i < 5; i++) {
            ByteBuffer byteBuffer = byteBufferArr[i];
            if (byteBuffer != null) {
                createDataBuffer.append(byteBuffer, i, Buffer.DataType.DATA_BUFFER);
                byteBuffer.rewind();
            }
        }
        createDataBuffer.finish();
        checkReadResult(createDataBuffer, byteBufferArr[0], 0, HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE);
        ByteBuffer duplicate = byteBufferArr[2].duplicate();
        duplicate.limit(HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE);
        checkReadResult(createDataBuffer, duplicate.slice(), 2, HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE);
        ByteBuffer duplicate2 = byteBufferArr[2].duplicate();
        duplicate2.position(HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE);
        checkReadResult(createDataBuffer, duplicate2.slice(), 2, HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE);
        checkReadResult(createDataBuffer, byteBufferArr[4], 4, HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE);
    }

    private void checkReadResult(DataBuffer dataBuffer, ByteBuffer byteBuffer, int i, int i2) {
        BufferWithSubpartition nextBuffer = dataBuffer.getNextBuffer(MemorySegmentFactory.allocateUnpooledSegment(i2));
        Assertions.assertThat(nextBuffer.getSubpartitionIndex()).isEqualTo(i);
        Assertions.assertThat(nextBuffer.getBuffer().getNioBufferReadable()).isEqualTo(byteBuffer);
    }

    @TestTemplate
    void testWriteEmptyData() throws Exception {
        DataBuffer createDataBuffer = createDataBuffer(1, HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE, 1);
        ByteBuffer allocate = ByteBuffer.allocate(1);
        allocate.position(1);
        Assertions.assertThatThrownBy(() -> {
            createDataBuffer.append(allocate, 0, Buffer.DataType.DATA_BUFFER);
        }).isInstanceOf(IllegalArgumentException.class);
    }

    @TestTemplate
    void testWriteFinishedDataBuffer() throws Exception {
        DataBuffer createDataBuffer = createDataBuffer(1, HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE, 1);
        createDataBuffer.finish();
        Assertions.assertThatThrownBy(() -> {
            createDataBuffer.append(ByteBuffer.allocate(1), 0, Buffer.DataType.DATA_BUFFER);
        }).isInstanceOf(IllegalStateException.class);
    }

    @TestTemplate
    void testWriteReleasedDataBuffer() throws Exception {
        DataBuffer createDataBuffer = createDataBuffer(1, HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE, 1);
        createDataBuffer.release();
        Assertions.assertThatThrownBy(() -> {
            createDataBuffer.append(ByteBuffer.allocate(1), 0, Buffer.DataType.DATA_BUFFER);
        }).isInstanceOf(IllegalStateException.class);
    }

    @TestTemplate
    void testWriteMoreDataThanCapacity() throws Exception {
        DataBuffer createDataBuffer = createDataBuffer(10, HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE, 1);
        for (int i = 1; i < 10; i++) {
            appendAndCheckResult(createDataBuffer, HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE, false, HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE * i, i, true);
        }
        appendAndCheckResult(createDataBuffer, HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE + 1, true, HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE * r0, 10 - 1, true);
    }

    @TestTemplate
    void testWriteLargeRecord() throws Exception {
        appendAndCheckResult(createDataBuffer(10, HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE, 1), (10 * HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE) + 1, true, 0L, 0L, false);
    }

    private void appendAndCheckResult(DataBuffer dataBuffer, int i, boolean z, long j, long j2, boolean z2) throws IOException {
        Assertions.assertThat(dataBuffer.append(ByteBuffer.allocate(i), 0, Buffer.DataType.DATA_BUFFER)).isEqualTo(z);
        Assertions.assertThat(dataBuffer.numTotalBytes()).isEqualTo(j);
        Assertions.assertThat(dataBuffer.numTotalRecords()).isEqualTo(j2);
        Assertions.assertThat(dataBuffer.hasRemaining()).isEqualTo(z2);
    }

    @TestTemplate
    void testReadUnfinishedDataBuffer() throws Exception {
        int i = HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE;
        DataBuffer createDataBuffer = createDataBuffer(1, HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE, 1);
        createDataBuffer.append(ByteBuffer.allocate(1), 0, Buffer.DataType.DATA_BUFFER);
        Assertions.assertThat(createDataBuffer.hasRemaining()).isTrue();
        Assertions.assertThatThrownBy(() -> {
            createDataBuffer.getNextBuffer(MemorySegmentFactory.allocateUnpooledSegment(i));
        }).isInstanceOf(IllegalStateException.class);
    }

    @TestTemplate
    void testReadReleasedDataBuffer() throws Exception {
        int i = HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE;
        DataBuffer createDataBuffer = createDataBuffer(1, HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE, 1);
        createDataBuffer.append(ByteBuffer.allocate(1), 0, Buffer.DataType.DATA_BUFFER);
        createDataBuffer.finish();
        Assertions.assertThat(createDataBuffer.hasRemaining()).isTrue();
        createDataBuffer.release();
        Assertions.assertThat(createDataBuffer.hasRemaining()).isTrue();
        Assertions.assertThatThrownBy(() -> {
            createDataBuffer.getNextBuffer(MemorySegmentFactory.allocateUnpooledSegment(i));
        }).isInstanceOf(IllegalStateException.class);
    }

    @TestTemplate
    void testReadEmptyDataBuffer() throws Exception {
        DataBuffer createDataBuffer = createDataBuffer(1, HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE, 1);
        createDataBuffer.finish();
        Assertions.assertThat(createDataBuffer.hasRemaining()).isFalse();
        Assertions.assertThat(createDataBuffer.getNextBuffer(MemorySegmentFactory.allocateUnpooledSegment(HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE))).isNull();
    }

    @TestTemplate
    void testReleaseDataBuffer() throws Exception {
        int i = (10 - 1) * HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE;
        BufferPool createBufferPool = new NetworkBufferPool(10, HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE).createBufferPool(10, 10);
        LinkedList linkedList = new LinkedList();
        for (int i2 = 0; i2 < 10; i2++) {
            linkedList.add(createBufferPool.requestMemorySegmentBlocking());
        }
        SortBasedDataBuffer sortBasedDataBuffer = new SortBasedDataBuffer(linkedList, createBufferPool, 1, HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE, 10, (int[]) null);
        sortBasedDataBuffer.append(ByteBuffer.allocate(i), 0, Buffer.DataType.DATA_BUFFER);
        Assertions.assertThat(createBufferPool.bestEffortGetNumOfUsedBuffers()).isEqualTo(10);
        Assertions.assertThat(sortBasedDataBuffer.hasRemaining()).isTrue();
        Assertions.assertThat(sortBasedDataBuffer.numTotalRecords()).isOne();
        Assertions.assertThat(sortBasedDataBuffer.numTotalBytes()).isEqualTo(i);
        sortBasedDataBuffer.release();
        Assertions.assertThat(createBufferPool.bestEffortGetNumOfUsedBuffers()).isZero();
        Assertions.assertThat(sortBasedDataBuffer.hasRemaining()).isTrue();
        Assertions.assertThat(sortBasedDataBuffer.numTotalRecords()).isOne();
        Assertions.assertThat(sortBasedDataBuffer.numTotalBytes()).isEqualTo(i);
    }

    private DataBuffer createDataBuffer(int i, int i2, int i3) throws Exception {
        return createDataBuffer(i, i2, i3, null);
    }

    private DataBuffer createDataBuffer(int i, int i2, int i3, int[] iArr) throws Exception {
        BufferPool createBufferPool = new NetworkBufferPool(i, i2).createBufferPool(i, i);
        LinkedList linkedList = new LinkedList();
        for (int i4 = 0; i4 < i; i4++) {
            linkedList.add(createBufferPool.requestMemorySegmentBlocking());
        }
        return this.useHashBuffer ? new HashBasedDataBuffer(linkedList, createBufferPool, i3, i2, i, iArr) : new SortBasedDataBuffer(linkedList, createBufferPool, i3, i2, i, iArr);
    }

    public static int[] getRandomSubpartitionOrder(int i) {
        int[] iArr = new int[i];
        int nextInt = new Random(1111L).nextInt(i);
        for (int i2 = 0; i2 < i; i2++) {
            iArr[i2] = (i2 + nextInt) % i;
        }
        return iArr;
    }
}
