package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Queue;
import java.util.Random;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.checkpoint.PendingCheckpointTest;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/PartitionSortedBufferTest.class */
public class PartitionSortedBufferTest {

    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/PartitionSortedBufferTest$DataAndType.class */
    public static class DataAndType {
        private final ByteBuffer data;
        private final Buffer.DataType dataType;

        /* JADX INFO: Access modifiers changed from: package-private */
        public DataAndType(ByteBuffer byteBuffer, Buffer.DataType dataType) {
            this.data = byteBuffer;
            this.dataType = dataType;
        }
    }

    @Test
    public void testWriteAndReadSortBuffer() throws Exception {
        Random random = new Random(1111L);
        Queue[] queueArr = new Queue[10];
        Queue[] queueArr2 = new Queue[10];
        for (int i = 0; i < 10; i++) {
            queueArr[i] = new ArrayDeque();
            queueArr2[i] = new ArrayDeque();
        }
        int[] iArr = new int[10];
        int[] iArr2 = new int[10];
        Arrays.fill(iArr, 0);
        Arrays.fill(iArr2, 0);
        int i2 = 0;
        SortBuffer createSortBuffer = createSortBuffer(1000, 1024, 10, getRandomSubpartitionOrder(10));
        while (true) {
            int nextInt = random.nextInt((1024 * 4) - 1) + 1;
            byte[] bArr = new byte[nextInt];
            random.nextBytes(bArr);
            ByteBuffer wrap = ByteBuffer.wrap(bArr);
            int nextInt2 = random.nextInt(10);
            Buffer.DataType dataType = random.nextBoolean() ? Buffer.DataType.DATA_BUFFER : Buffer.DataType.EVENT_BUFFER;
            if (!createSortBuffer.append(wrap, nextInt2, dataType)) {
                break;
            }
            wrap.rewind();
            queueArr[nextInt2].add(new DataAndType(wrap, dataType));
            iArr[nextInt2] = iArr[nextInt2] + nextInt;
            i2 += nextInt;
        }
        createSortBuffer.finish();
        while (createSortBuffer.hasRemaining()) {
            BufferWithChannel copyIntoSegment = createSortBuffer.copyIntoSegment(MemorySegmentFactory.allocateUnpooledSegment(1024));
            int channelIndex = copyIntoSegment.getChannelIndex();
            queueArr2[channelIndex].add(copyIntoSegment.getBuffer());
            iArr2[channelIndex] = iArr2[channelIndex] + copyIntoSegment.getBuffer().readableBytes();
        }
        Assert.assertEquals(i2, createSortBuffer.numBytes());
        checkWriteReadResult(10, iArr, iArr2, queueArr, queueArr2);
    }

    public static void checkWriteReadResult(int i, int[] iArr, int[] iArr2, Queue<DataAndType>[] queueArr, Queue<Buffer>[] queueArr2) {
        for (int i2 = 0; i2 < i; i2++) {
            Assert.assertEquals(iArr[i2], iArr2[i2]);
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            ByteBuffer allocate = ByteBuffer.allocate(iArr[i2]);
            for (DataAndType dataAndType : queueArr[i2]) {
                allocate.put(dataAndType.data);
                dataAndType.data.rewind();
                if (dataAndType.dataType.isEvent()) {
                    arrayList.add(dataAndType);
                }
            }
            ByteBuffer allocate2 = ByteBuffer.allocate(iArr2[i2]);
            for (Buffer buffer : queueArr2[i2]) {
                allocate2.put(buffer.getNioBufferReadable());
                if (!buffer.isBuffer()) {
                    arrayList2.add(buffer);
                }
            }
            allocate.flip();
            allocate2.flip();
            Assert.assertEquals(allocate, allocate2);
            Assert.assertEquals(arrayList.size(), arrayList2.size());
            for (int i3 = 0; i3 < arrayList.size(); i3++) {
                Assert.assertEquals(((DataAndType) arrayList.get(i3)).dataType, ((Buffer) arrayList2.get(i3)).getDataType());
                Assert.assertEquals(((DataAndType) arrayList.get(i3)).data, ((Buffer) arrayList2.get(i3)).getNioBufferReadable());
            }
        }
    }

    @Test
    public void testWriteReadWithEmptyChannel() throws Exception {
        ByteBuffer[] byteBufferArr = {ByteBuffer.allocate(PendingCheckpointTest.MAX_PARALLELISM), null, ByteBuffer.allocate(1536), null, ByteBuffer.allocate(1024)};
        SortBuffer createSortBuffer = createSortBuffer(10, 1024, 5);
        for (int i = 0; i < 5; i++) {
            ByteBuffer byteBuffer = byteBufferArr[i];
            if (byteBuffer != null) {
                createSortBuffer.append(byteBuffer, i, Buffer.DataType.DATA_BUFFER);
                byteBuffer.rewind();
            }
        }
        createSortBuffer.finish();
        checkReadResult(createSortBuffer, byteBufferArr[0], 0, 1024);
        ByteBuffer duplicate = byteBufferArr[2].duplicate();
        duplicate.limit(1024);
        checkReadResult(createSortBuffer, duplicate.slice(), 2, 1024);
        ByteBuffer duplicate2 = byteBufferArr[2].duplicate();
        duplicate2.position(1024);
        checkReadResult(createSortBuffer, duplicate2.slice(), 2, 1024);
        checkReadResult(createSortBuffer, byteBufferArr[4], 4, 1024);
    }

    private void checkReadResult(SortBuffer sortBuffer, ByteBuffer byteBuffer, int i, int i2) {
        BufferWithChannel copyIntoSegment = sortBuffer.copyIntoSegment(MemorySegmentFactory.allocateUnpooledSegment(i2));
        Assert.assertEquals(i, copyIntoSegment.getChannelIndex());
        Assert.assertEquals(byteBuffer, copyIntoSegment.getBuffer().getNioBufferReadable());
    }

    @Test(expected = IllegalArgumentException.class)
    public void testWriteEmptyData() throws Exception {
        SortBuffer createSortBuffer = createSortBuffer(1, 1024, 1);
        ByteBuffer allocate = ByteBuffer.allocate(1);
        allocate.position(1);
        createSortBuffer.append(allocate, 0, Buffer.DataType.DATA_BUFFER);
    }

    @Test(expected = IllegalStateException.class)
    public void testWriteFinishedSortBuffer() throws Exception {
        SortBuffer createSortBuffer = createSortBuffer(1, 1024, 1);
        createSortBuffer.finish();
        createSortBuffer.append(ByteBuffer.allocate(1), 0, Buffer.DataType.DATA_BUFFER);
    }

    @Test(expected = IllegalStateException.class)
    public void testWriteReleasedSortBuffer() throws Exception {
        SortBuffer createSortBuffer = createSortBuffer(1, 1024, 1);
        createSortBuffer.release();
        createSortBuffer.append(ByteBuffer.allocate(1), 0, Buffer.DataType.DATA_BUFFER);
    }

    @Test
    public void testWriteMoreDataThanCapacity() throws Exception {
        SortBuffer createSortBuffer = createSortBuffer(10, 1024, 1);
        for (int i = 1; i < 10; i++) {
            appendAndCheckResult(createSortBuffer, 1024, true, 1024 * i, i, true);
        }
        appendAndCheckResult(createSortBuffer, 1024, false, 1024 * r0, 10 - 1, true);
    }

    @Test
    public void testWriteLargeRecord() throws Exception {
        appendAndCheckResult(createSortBuffer(10, 1024, 1), 10 * 1024, false, 0L, 0L, false);
    }

    private void appendAndCheckResult(SortBuffer sortBuffer, int i, boolean z, long j, long j2, boolean z2) throws IOException {
        Assert.assertEquals(Boolean.valueOf(z), Boolean.valueOf(sortBuffer.append(ByteBuffer.allocate(i), 0, Buffer.DataType.DATA_BUFFER)));
        Assert.assertEquals(j, sortBuffer.numBytes());
        Assert.assertEquals(j2, sortBuffer.numRecords());
        Assert.assertEquals(Boolean.valueOf(z2), Boolean.valueOf(sortBuffer.hasRemaining()));
    }

    @Test(expected = IllegalStateException.class)
    public void testReadUnfinishedSortBuffer() throws Exception {
        SortBuffer createSortBuffer = createSortBuffer(1, 1024, 1);
        createSortBuffer.append(ByteBuffer.allocate(1), 0, Buffer.DataType.DATA_BUFFER);
        Assert.assertTrue(createSortBuffer.hasRemaining());
        createSortBuffer.copyIntoSegment(MemorySegmentFactory.allocateUnpooledSegment(1024));
    }

    @Test(expected = IllegalStateException.class)
    public void testReadReleasedSortBuffer() throws Exception {
        SortBuffer createSortBuffer = createSortBuffer(1, 1024, 1);
        createSortBuffer.append(ByteBuffer.allocate(1), 0, Buffer.DataType.DATA_BUFFER);
        createSortBuffer.finish();
        Assert.assertTrue(createSortBuffer.hasRemaining());
        createSortBuffer.release();
        Assert.assertFalse(createSortBuffer.hasRemaining());
        createSortBuffer.copyIntoSegment(MemorySegmentFactory.allocateUnpooledSegment(1024));
    }

    @Test(expected = IllegalStateException.class)
    public void testReadEmptySortBuffer() throws Exception {
        SortBuffer createSortBuffer = createSortBuffer(1, 1024, 1);
        createSortBuffer.finish();
        Assert.assertFalse(createSortBuffer.hasRemaining());
        createSortBuffer.copyIntoSegment(MemorySegmentFactory.allocateUnpooledSegment(1024));
    }

    @Test
    public void testReleaseSortBuffer() throws Exception {
        int i = (10 - 1) * 1024;
        PartitionSortedBuffer partitionSortedBuffer = new PartitionSortedBuffer(new Object(), new NetworkBufferPool(10, 1024).createBufferPool(10, 10), 1, 1024, 10, (int[]) null);
        partitionSortedBuffer.append(ByteBuffer.allocate(i), 0, Buffer.DataType.DATA_BUFFER);
        Assert.assertEquals(10, r0.bestEffortGetNumOfUsedBuffers());
        Assert.assertTrue(partitionSortedBuffer.hasRemaining());
        Assert.assertEquals(1L, partitionSortedBuffer.numRecords());
        Assert.assertEquals(i, partitionSortedBuffer.numBytes());
        partitionSortedBuffer.release();
        Assert.assertEquals(0L, r0.bestEffortGetNumOfUsedBuffers());
        Assert.assertFalse(partitionSortedBuffer.hasRemaining());
        Assert.assertEquals(0L, partitionSortedBuffer.numRecords());
        Assert.assertEquals(0L, partitionSortedBuffer.numBytes());
    }

    private SortBuffer createSortBuffer(int i, int i2, int i3) throws IOException {
        return createSortBuffer(i, i2, i3, null);
    }

    private SortBuffer createSortBuffer(int i, int i2, int i3, int[] iArr) throws IOException {
        return new PartitionSortedBuffer(new Object(), new NetworkBufferPool(i, i2).createBufferPool(i, i), i3, i2, i, iArr);
    }

    public static int[] getRandomSubpartitionOrder(int i) {
        int[] iArr = new int[i];
        int nextInt = new Random(1111L).nextInt(i);
        for (int i2 = 0; i2 < i; i2++) {
            iArr[i2] = (i2 + nextInt) % i;
        }
        return iArr;
    }
}
