package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
import org.apache.flink.runtime.io.disk.FileChannelManager;
import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.api.EndOfData;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/ResultPartitionTest.class */
public class ResultPartitionTest {
    private static final String tempDir = EnvironmentInformation.getTemporaryFileDirectory();
    private static FileChannelManager fileChannelManager;
    private final int bufferSize = 1024;

    @BeforeClass
    public static void setUp() {
        fileChannelManager = new FileChannelManagerImpl(new String[]{tempDir}, "testing");
    }

    @AfterClass
    public static void shutdown() throws Exception {
        fileChannelManager.close();
    }

    @Test
    public void testResultSubpartitionInfo() {
        for (int i = 0; i < 2; i++) {
            ResultSubpartition[] allPartitions = new ResultPartitionBuilder().setResultPartitionIndex(i).setNumberOfSubpartitions(3).build().getAllPartitions();
            for (int i2 = 0; i2 < allPartitions.length; i2++) {
                ResultSubpartitionInfo subpartitionInfo = allPartitions[i2].getSubpartitionInfo();
                Assert.assertEquals(i, subpartitionInfo.getPartitionIdx());
                Assert.assertEquals(i2, subpartitionInfo.getSubPartitionIdx());
            }
        }
    }

    @Test
    public void testAddOnFinishedPipelinedPartition() throws Exception {
        testAddOnFinishedPartition(ResultPartitionType.PIPELINED);
    }

    @Test
    public void testAddOnFinishedBlockingPartition() throws Exception {
        testAddOnFinishedPartition(ResultPartitionType.BLOCKING);
    }

    @Test
    public void testBlockingPartitionIsConsumableMultipleTimesIfNotReleasedOnConsumption() throws IOException {
        ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
        ResultPartition build = new ResultPartitionBuilder().setResultPartitionManager(resultPartitionManager).setResultPartitionType(ResultPartitionType.BLOCKING).setFileChannelManager(fileChannelManager).build();
        resultPartitionManager.registerResultPartition(build);
        build.finish();
        MatcherAssert.assertThat(resultPartitionManager.getUnreleasedPartitions(), Matchers.contains(new ResultPartitionID[]{build.getPartitionId()}));
        for (int i = 0; i < 2; i++) {
            build.createSubpartitionView(0, () -> {
            }).releaseAllResources();
            MatcherAssert.assertThat(resultPartitionManager.getUnreleasedPartitions(), Matchers.contains(new ResultPartitionID[]{build.getPartitionId()}));
            Assert.assertFalse(build.isReleased());
        }
    }

    private void testAddOnFinishedPartition(ResultPartitionType resultPartitionType) throws Exception {
        BufferWritingResultPartition createResultPartition = createResultPartition(resultPartitionType);
        try {
            createResultPartition.finish();
            createResultPartition.emitRecord(ByteBuffer.allocate(1024), 0);
            Assert.assertEquals(0L, createResultPartition.numBuffersOut.getCount());
            Assert.assertEquals(0L, createResultPartition.numBytesOut.getCount());
            Assert.assertEquals(0L, createResultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers());
        } catch (IllegalStateException e) {
            Assert.assertEquals(0L, createResultPartition.numBuffersOut.getCount());
            Assert.assertEquals(0L, createResultPartition.numBytesOut.getCount());
            Assert.assertEquals(0L, createResultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers());
        } catch (Throwable th) {
            Assert.assertEquals(0L, createResultPartition.numBuffersOut.getCount());
            Assert.assertEquals(0L, createResultPartition.numBytesOut.getCount());
            Assert.assertEquals(0L, createResultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers());
            throw th;
        }
    }

    @Test
    public void testAddOnReleasedPipelinedPartition() throws Exception {
        testAddOnReleasedPartition(ResultPartitionType.PIPELINED);
    }

    @Test
    public void testAddOnReleasedBlockingPartition() throws Exception {
        testAddOnReleasedPartition(ResultPartitionType.BLOCKING);
    }

    private void testAddOnReleasedPartition(ResultPartitionType resultPartitionType) throws Exception {
        BufferWritingResultPartition createResultPartition = createResultPartition(resultPartitionType);
        try {
            createResultPartition.release((Throwable) null);
            createResultPartition.emitRecord(ByteBuffer.allocate(1024), 0);
            Assert.assertEquals(1L, createResultPartition.numBuffersOut.getCount());
            Assert.assertEquals(1024L, createResultPartition.numBytesOut.getCount());
            Assert.assertEquals(0L, createResultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers());
        } catch (Throwable th) {
            Assert.assertEquals(1L, createResultPartition.numBuffersOut.getCount());
            Assert.assertEquals(1024L, createResultPartition.numBytesOut.getCount());
            Assert.assertEquals(0L, createResultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers());
            throw th;
        }
    }

    @Test
    public void testAddOnPipelinedPartition() throws Exception {
        testAddOnPartition(ResultPartitionType.PIPELINED);
    }

    @Test
    public void testAddOnBlockingPartition() throws Exception {
        testAddOnPartition(ResultPartitionType.BLOCKING);
    }

    @Test
    public void testCreateSubpartitionOnFailingPartition() throws Exception {
        ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
        ResultPartition build = new ResultPartitionBuilder().setResultPartitionManager(resultPartitionManager).build();
        resultPartitionManager.registerResultPartition(build);
        build.fail((Throwable) null);
        PartitionTestUtils.verifyCreateSubpartitionViewThrowsException(resultPartitionManager, build.getPartitionId());
    }

    private void testAddOnPartition(ResultPartitionType resultPartitionType) throws Exception {
        BufferWritingResultPartition createResultPartition = createResultPartition(resultPartitionType);
        try {
            createResultPartition.emitRecord(ByteBuffer.allocate(1024), 0);
            Assert.assertEquals(1L, createResultPartition.numBuffersOut.getCount());
            Assert.assertEquals(1024L, createResultPartition.numBytesOut.getCount());
            Assert.assertEquals(1L, createResultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers());
        } catch (Throwable th) {
            Assert.assertEquals(1L, createResultPartition.numBuffersOut.getCount());
            Assert.assertEquals(1024L, createResultPartition.numBytesOut.getCount());
            Assert.assertEquals(1L, createResultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers());
            throw th;
        }
    }

    @Test
    public void testReleaseMemoryOnPipelinedPartition() throws Exception {
        NettyShuffleEnvironment build = new NettyShuffleEnvironmentBuilder().setNumNetworkBuffers(10).setBufferSize(1024).build();
        ResultPartition createPartition = PartitionTestUtils.createPartition(build, ResultPartitionType.PIPELINED, 1);
        try {
            createPartition.setup();
            for (int i = 0; i < 10; i++) {
                createPartition.emitRecord(ByteBuffer.allocate(1023), 0);
            }
            Assert.assertEquals(0L, createPartition.getBufferPool().getNumberOfAvailableMemorySegments());
            createPartition.close();
            Assert.assertTrue(createPartition.getBufferPool().isDestroyed());
            Assert.assertEquals(10L, build.getNetworkBufferPool().getNumberOfUsedMemorySegments());
            createPartition.release();
            Assert.assertEquals(0L, build.getNetworkBufferPool().getNumberOfUsedMemorySegments());
            build.close();
        } catch (Throwable th) {
            build.close();
            throw th;
        }
    }

    @Test
    public void testIsAvailableOrNot() throws IOException {
        NettyShuffleEnvironment build = new NettyShuffleEnvironmentBuilder().setNumNetworkBuffers(10).setBufferSize(1024).build();
        ResultPartition createPartition = PartitionTestUtils.createPartition(build, ResultPartitionType.PIPELINED, 1);
        try {
            createPartition.setup();
            createPartition.getBufferPool().setNumBuffers(2);
            Assert.assertTrue(createPartition.getAvailableFuture().isDone());
            createPartition.emitRecord(ByteBuffer.allocate(1024), 0);
            createPartition.emitRecord(ByteBuffer.allocate(1024), 0);
            Assert.assertFalse(createPartition.getAvailableFuture().isDone());
            createPartition.release();
            build.close();
        } catch (Throwable th) {
            createPartition.release();
            build.close();
            throw th;
        }
    }

    @Test
    public void testPipelinedPartitionBufferPool() throws Exception {
        testPartitionBufferPool(ResultPartitionType.PIPELINED_BOUNDED);
    }

    @Test
    public void testBlockingPartitionBufferPool() throws Exception {
        testPartitionBufferPool(ResultPartitionType.BLOCKING);
    }

    private void testPartitionBufferPool(ResultPartitionType resultPartitionType) throws Exception {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(20, 1);
        ResultPartition build = new ResultPartitionBuilder().setResultPartitionType(resultPartitionType).setFileChannelManager(fileChannelManager).setNetworkBuffersPerChannel(2).setFloatingNetworkBuffersPerGate(8).setNetworkBufferPool(networkBufferPool).build();
        try {
            build.setup();
            BufferPool bufferPool = build.getBufferPool();
            Assert.assertEquals(build.getNumberOfSubpartitions() + 1, bufferPool.getNumberOfRequiredMemorySegments());
            if (resultPartitionType.isBounded()) {
                Assert.assertEquals((2 * build.getNumberOfSubpartitions()) + 8, bufferPool.getMaxNumberOfMemorySegments());
            } else {
                Assert.assertEquals(2147483647L, bufferPool.getMaxNumberOfMemorySegments());
            }
        } finally {
            networkBufferPool.destroyAllBufferPools();
            networkBufferPool.destroy();
        }
    }

    private BufferWritingResultPartition createResultPartition(ResultPartitionType resultPartitionType) throws IOException {
        BufferWritingResultPartition createPartition = PartitionTestUtils.createPartition(new NettyShuffleEnvironmentBuilder().setNumNetworkBuffers(10).setBufferSize(1024).build(), fileChannelManager, resultPartitionType, 2);
        createPartition.setup();
        return createPartition;
    }

    @Test
    public void testIdleAndBackPressuredTime() throws IOException, InterruptedException {
        int i = 1024;
        BufferPool createBufferPool = new NetworkBufferPool(10, 1024).createBufferPool(1, 1, 1, Integer.MAX_VALUE);
        BufferWritingResultPartition build = new ResultPartitionBuilder().setBufferPoolFactory(() -> {
            return createBufferPool;
        }).build();
        build.setup();
        build.emitRecord(ByteBuffer.allocate(1024), 0);
        ResultSubpartitionView createSubpartitionView = build.createSubpartitionView(0, new NoOpBufferAvailablityListener());
        Buffer buffer = createSubpartitionView.getNextBuffer().buffer();
        Assert.assertNotNull(buffer);
        MatcherAssert.assertThat(build.getHardBackPressuredTimeMsPerSecond().getValue(), Matchers.equalTo(0L));
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Thread thread = new Thread(() -> {
            try {
                countDownLatch.countDown();
                build.emitRecord(ByteBuffer.allocate(i), 0);
            } catch (Exception e) {
            }
        });
        thread.start();
        countDownLatch.await();
        Thread.sleep(100L);
        buffer.recycleBuffer();
        thread.join();
        Assert.assertThat(Long.valueOf(build.getHardBackPressuredTimeMsPerSecond().getCount()), Matchers.greaterThan(0L));
        Assert.assertNotNull(createSubpartitionView.getNextBuffer().buffer());
    }

    @Test
    public void testFlushBoundedBlockingResultPartition() throws IOException {
        BufferWritingResultPartition createResultPartition = createResultPartition(ResultPartitionType.BLOCKING);
        ByteBuffer allocate = ByteBuffer.allocate(4);
        allocate.putInt(1024);
        allocate.rewind();
        createResultPartition.emitRecord(allocate, 0);
        createResultPartition.flush(0);
        allocate.rewind();
        createResultPartition.emitRecord(allocate, 0);
        allocate.rewind();
        createResultPartition.broadcastRecord(allocate);
        createResultPartition.flushAll();
        allocate.rewind();
        createResultPartition.broadcastRecord(allocate);
        createResultPartition.finish();
        allocate.rewind();
        ResultSubpartitionView createSubpartitionView = createResultPartition.createSubpartitionView(0, new NoOpBufferAvailablityListener());
        for (int i = 0; i < 4; i++) {
            Assert.assertEquals(allocate, createSubpartitionView.getNextBuffer().buffer().getNioBufferReadable());
        }
        Assert.assertFalse(createSubpartitionView.getNextBuffer().buffer().isBuffer());
        Assert.assertNull(createSubpartitionView.getNextBuffer());
        ResultSubpartitionView createSubpartitionView2 = createResultPartition.createSubpartitionView(1, new NoOpBufferAvailablityListener());
        for (int i2 = 0; i2 < 2; i2++) {
            Assert.assertEquals(allocate, createSubpartitionView2.getNextBuffer().buffer().getNioBufferReadable());
        }
        Assert.assertFalse(createSubpartitionView2.getNextBuffer().buffer().isBuffer());
        Assert.assertNull(createSubpartitionView2.getNextBuffer());
    }

    @Test
    public void testEmitRecordWithRecordSpanningMultipleBuffers() throws Exception {
        BufferWritingResultPartition createResultPartition = createResultPartition(ResultPartitionType.PIPELINED);
        PipelinedSubpartition pipelinedSubpartition = createResultPartition.subpartitions[0];
        try {
            createResultPartition.emitRecord(ByteBuffer.allocate(341), 0);
            createResultPartition.emitRecord(ByteBuffer.allocate(1024), 0);
            Assert.assertEquals(2L, pipelinedSubpartition.getNumberOfQueuedBuffers());
            Assert.assertEquals(0L, pipelinedSubpartition.getNextBuffer().getPartialRecordLength());
            Assert.assertEquals(341, pipelinedSubpartition.getNextBuffer().getPartialRecordLength());
        } catch (Throwable th) {
            Assert.assertEquals(2L, pipelinedSubpartition.getNumberOfQueuedBuffers());
            Assert.assertEquals(0L, pipelinedSubpartition.getNextBuffer().getPartialRecordLength());
            Assert.assertEquals(341, pipelinedSubpartition.getNextBuffer().getPartialRecordLength());
            throw th;
        }
    }

    @Test
    public void testBroadcastRecordWithRecordSpanningMultipleBuffers() throws Exception {
        BufferWritingResultPartition createResultPartition = createResultPartition(ResultPartitionType.PIPELINED);
        try {
            createResultPartition.broadcastRecord(ByteBuffer.allocate(341));
            createResultPartition.broadcastRecord(ByteBuffer.allocate(1024));
            for (PipelinedSubpartition pipelinedSubpartition : createResultPartition.subpartitions) {
                Assert.assertEquals(2L, pipelinedSubpartition.getNumberOfQueuedBuffers());
                Assert.assertEquals(0L, pipelinedSubpartition.getNextBuffer().getPartialRecordLength());
                Assert.assertEquals(341, pipelinedSubpartition.getNextBuffer().getPartialRecordLength());
            }
        } catch (Throwable th) {
            for (PipelinedSubpartition pipelinedSubpartition2 : createResultPartition.subpartitions) {
                Assert.assertEquals(2L, pipelinedSubpartition2.getNumberOfQueuedBuffers());
                Assert.assertEquals(0L, pipelinedSubpartition2.getNextBuffer().getPartialRecordLength());
                Assert.assertEquals(341, pipelinedSubpartition2.getNextBuffer().getPartialRecordLength());
            }
            throw th;
        }
    }

    @Test
    public void testWaitForAllRecordProcessed() throws IOException {
        BufferWritingResultPartition createResultPartition = createResultPartition(ResultPartitionType.PIPELINED_BOUNDED);
        createResultPartition.notifyEndOfData(StopMode.DRAIN);
        CompletableFuture allDataProcessedFuture = createResultPartition.getAllDataProcessedFuture();
        Assert.assertFalse(allDataProcessedFuture.isDone());
        for (PipelinedSubpartition pipelinedSubpartition : createResultPartition.subpartitions) {
            Assert.assertEquals(1L, pipelinedSubpartition.getTotalNumberOfBuffersUnsafe());
            Buffer buffer = pipelinedSubpartition.pollBuffer().buffer();
            Assert.assertFalse(buffer.isBuffer());
            Assert.assertEquals(new EndOfData(StopMode.DRAIN), EventSerializer.fromBuffer(buffer, getClass().getClassLoader()));
        }
        for (int i = 0; i < createResultPartition.subpartitions.length; i++) {
            createResultPartition.subpartitions[i].acknowledgeAllDataProcessed();
            if (i < createResultPartition.subpartitions.length - 1) {
                Assert.assertFalse(allDataProcessedFuture.isDone());
            } else {
                Assert.assertTrue(allDataProcessedFuture.isDone());
                Assert.assertFalse(allDataProcessedFuture.isCompletedExceptionally());
            }
        }
    }

    @Test
    public void testDifferentBufferSizeForSubpartitions() throws IOException {
        BufferWritingResultPartition createResultPartition = createResultPartition(ResultPartitionType.PIPELINED_BOUNDED);
        PipelinedSubpartition[] pipelinedSubpartitionArr = createResultPartition.subpartitions;
        Assert.assertEquals(2L, pipelinedSubpartitionArr.length);
        PipelinedSubpartition pipelinedSubpartition = pipelinedSubpartitionArr[0];
        PipelinedSubpartition pipelinedSubpartition2 = pipelinedSubpartitionArr[1];
        pipelinedSubpartition.bufferSize(10);
        pipelinedSubpartition2.bufferSize(6);
        createResultPartition.emitRecord(ByteBuffer.allocate(2), 0);
        createResultPartition.emitRecord(ByteBuffer.allocate(10), 0);
        createResultPartition.emitRecord(ByteBuffer.allocate(2), 1);
        createResultPartition.emitRecord(ByteBuffer.allocate(10), 1);
        Assert.assertEquals(10L, pipelinedSubpartition.pollBuffer().buffer().getSize());
        Assert.assertEquals(2L, pipelinedSubpartition.pollBuffer().buffer().getSize());
        Assert.assertEquals(6L, pipelinedSubpartition2.pollBuffer().buffer().getSize());
        Assert.assertEquals(6L, pipelinedSubpartition2.pollBuffer().buffer().getSize());
        pipelinedSubpartition.bufferSize(13);
        pipelinedSubpartition2.bufferSize(5);
        createResultPartition.emitRecord(ByteBuffer.allocate(12), 0);
        createResultPartition.emitRecord(ByteBuffer.allocate(8), 0);
        createResultPartition.emitRecord(ByteBuffer.allocate(2), 1);
        createResultPartition.emitRecord(ByteBuffer.allocate(7), 1);
        Assert.assertEquals(8L, pipelinedSubpartition.pollBuffer().buffer().getSize());
        Assert.assertEquals(12L, pipelinedSubpartition.pollBuffer().buffer().getSize());
        Assert.assertEquals(5L, pipelinedSubpartition2.pollBuffer().buffer().getSize());
        Assert.assertEquals(4L, pipelinedSubpartition2.pollBuffer().buffer().getSize());
    }

    @Test
    public void testBufferSizeGreaterOrEqualToFirstRecord() throws IOException {
        BufferWritingResultPartition createResultPartition = createResultPartition(ResultPartitionType.PIPELINED_BOUNDED);
        PipelinedSubpartition[] pipelinedSubpartitionArr = createResultPartition.subpartitions;
        Assert.assertEquals(2L, pipelinedSubpartitionArr.length);
        PipelinedSubpartition pipelinedSubpartition = pipelinedSubpartitionArr[0];
        PipelinedSubpartition pipelinedSubpartition2 = pipelinedSubpartitionArr[1];
        pipelinedSubpartition.bufferSize(10);
        pipelinedSubpartition2.bufferSize(7);
        createResultPartition.emitRecord(ByteBuffer.allocate(12), 0);
        createResultPartition.emitRecord(ByteBuffer.allocate(111), 1);
        Assert.assertEquals(12L, pipelinedSubpartition.pollBuffer().buffer().getSize());
        Assert.assertEquals(111L, pipelinedSubpartition2.pollBuffer().buffer().getSize());
    }

    @Test
    public void testDynamicBufferSizeForBroadcast() throws IOException {
        BufferWritingResultPartition createResultPartition = createResultPartition(ResultPartitionType.PIPELINED_BOUNDED);
        PipelinedSubpartition[] pipelinedSubpartitionArr = createResultPartition.subpartitions;
        Assert.assertEquals(2L, pipelinedSubpartitionArr.length);
        PipelinedSubpartition pipelinedSubpartition = pipelinedSubpartitionArr[0];
        PipelinedSubpartition pipelinedSubpartition2 = pipelinedSubpartitionArr[1];
        pipelinedSubpartition.bufferSize(6);
        pipelinedSubpartition2.bufferSize(10);
        createResultPartition.broadcastRecord(ByteBuffer.allocate(6));
        Assert.assertEquals(6L, pipelinedSubpartition.pollBuffer().buffer().getSize());
        Assert.assertEquals(6L, pipelinedSubpartition2.pollBuffer().buffer().getSize());
        pipelinedSubpartition.bufferSize(4);
        pipelinedSubpartition2.bufferSize(12);
        createResultPartition.broadcastRecord(ByteBuffer.allocate(3));
        createResultPartition.broadcastRecord(ByteBuffer.allocate(7));
        Assert.assertEquals(4L, pipelinedSubpartition.pollBuffer().buffer().getSize());
        Assert.assertEquals(6L, pipelinedSubpartition.pollBuffer().buffer().getSize());
        Assert.assertEquals(4L, pipelinedSubpartition2.pollBuffer().buffer().getSize());
        Assert.assertEquals(6L, pipelinedSubpartition2.pollBuffer().buffer().getSize());
        pipelinedSubpartition.bufferSize(8);
        pipelinedSubpartition2.bufferSize(5);
        createResultPartition.broadcastRecord(ByteBuffer.allocate(3));
        Assert.assertEquals(3L, pipelinedSubpartition.pollBuffer().buffer().getSize());
        Assert.assertEquals(3L, pipelinedSubpartition2.pollBuffer().buffer().getSize());
    }

    @Test
    public void testBufferSizeGreaterOrEqualToFirstBroadcastRecord() throws IOException {
        BufferWritingResultPartition createResultPartition = createResultPartition(ResultPartitionType.PIPELINED_BOUNDED);
        PipelinedSubpartition[] pipelinedSubpartitionArr = createResultPartition.subpartitions;
        PipelinedSubpartition pipelinedSubpartition = pipelinedSubpartitionArr[0];
        PipelinedSubpartition pipelinedSubpartition2 = pipelinedSubpartitionArr[1];
        pipelinedSubpartition.bufferSize(6);
        pipelinedSubpartition2.bufferSize(10);
        createResultPartition.broadcastRecord(ByteBuffer.allocate(31));
        Assert.assertEquals(31L, pipelinedSubpartition.pollBuffer().buffer().getSize());
        Assert.assertEquals(31L, pipelinedSubpartition2.pollBuffer().buffer().getSize());
    }

    @Test
    public void testBufferSizeNotChanged() throws IOException {
        BufferWritingResultPartition createResultPartition = createResultPartition(ResultPartitionType.PIPELINED_BOUNDED);
        PipelinedSubpartition[] pipelinedSubpartitionArr = createResultPartition.subpartitions;
        Assert.assertEquals(2L, pipelinedSubpartitionArr.length);
        PipelinedSubpartition pipelinedSubpartition = pipelinedSubpartitionArr[0];
        PipelinedSubpartition pipelinedSubpartition2 = pipelinedSubpartitionArr[1];
        pipelinedSubpartition.bufferSize(1025);
        pipelinedSubpartition2.bufferSize(Integer.MAX_VALUE);
        createResultPartition.emitRecord(ByteBuffer.allocate(1024), 0);
        createResultPartition.emitRecord(ByteBuffer.allocate(1024), 1);
        Assert.assertEquals(1024L, pipelinedSubpartition.pollBuffer().buffer().getSize());
        Assert.assertEquals(1024L, pipelinedSubpartition2.pollBuffer().buffer().getSize());
    }

    @Test
    public void testNumBytesProducedCounterForUnicast() throws IOException {
        testNumBytesProducedCounter(false);
    }

    @Test
    public void testNumBytesProducedCounterForBroadcast() throws IOException {
        testNumBytesProducedCounter(true);
    }

    private void testNumBytesProducedCounter(boolean z) throws IOException {
        BufferWritingResultPartition createResultPartition = createResultPartition(ResultPartitionType.BLOCKING);
        if (z) {
            createResultPartition.broadcastRecord(ByteBuffer.allocate(1024));
            Assert.assertEquals(1024L, createResultPartition.numBytesProduced.getCount());
            Assert.assertEquals(2048L, createResultPartition.numBytesOut.getCount());
        } else {
            createResultPartition.emitRecord(ByteBuffer.allocate(1024), 0);
            Assert.assertEquals(1024L, createResultPartition.numBytesProduced.getCount());
            Assert.assertEquals(1024L, createResultPartition.numBytesOut.getCount());
        }
    }

    @Test
    public void testSizeOfQueuedBuffers() throws IOException {
        BufferWritingResultPartition createResultPartition = createResultPartition(ResultPartitionType.PIPELINED);
        PipelinedSubpartition[] pipelinedSubpartitionArr = createResultPartition.subpartitions;
        Assert.assertEquals(2L, pipelinedSubpartitionArr.length);
        PipelinedSubpartition pipelinedSubpartition = pipelinedSubpartitionArr[0];
        PipelinedSubpartition pipelinedSubpartition2 = pipelinedSubpartitionArr[1];
        pipelinedSubpartition.bufferSize(10);
        pipelinedSubpartition2.bufferSize(10);
        createResultPartition.emitRecord(ByteBuffer.allocate(3), 0);
        Assert.assertEquals(3L, createResultPartition.getSizeOfQueuedBuffersUnsafe());
        createResultPartition.emitRecord(ByteBuffer.allocate(3), 1);
        Assert.assertEquals(6L, createResultPartition.getSizeOfQueuedBuffersUnsafe());
        createResultPartition.emitRecord(ByteBuffer.allocate(10), 0);
        Assert.assertEquals(16L, createResultPartition.getSizeOfQueuedBuffersUnsafe());
        createResultPartition.emitRecord(ByteBuffer.allocate(10), 1);
        Assert.assertEquals(26L, createResultPartition.getSizeOfQueuedBuffersUnsafe());
        createResultPartition.broadcastEvent(EndOfPartitionEvent.INSTANCE, false);
        Assert.assertEquals(34L, createResultPartition.getSizeOfQueuedBuffersUnsafe());
        createResultPartition.emitRecord(ByteBuffer.allocate(5), 0);
        Assert.assertEquals(39L, createResultPartition.getSizeOfQueuedBuffersUnsafe());
        createResultPartition.broadcastRecord(ByteBuffer.allocate(7));
        Assert.assertEquals(53L, createResultPartition.getSizeOfQueuedBuffersUnsafe());
        Assert.assertEquals(10L, pipelinedSubpartition.pollBuffer().buffer().getSize());
        Assert.assertEquals(43L, createResultPartition.getSizeOfQueuedBuffersUnsafe());
        Assert.assertEquals(10L, pipelinedSubpartition2.pollBuffer().buffer().getSize());
        Assert.assertEquals(33L, createResultPartition.getSizeOfQueuedBuffersUnsafe());
        Assert.assertEquals(3L, pipelinedSubpartition.pollBuffer().buffer().getSize());
        Assert.assertEquals(30L, createResultPartition.getSizeOfQueuedBuffersUnsafe());
        Assert.assertEquals(3L, pipelinedSubpartition2.pollBuffer().buffer().getSize());
        Assert.assertEquals(27L, createResultPartition.getSizeOfQueuedBuffersUnsafe());
        Assert.assertEquals(4L, pipelinedSubpartition.pollBuffer().buffer().getSize());
        Assert.assertEquals(23L, createResultPartition.getSizeOfQueuedBuffersUnsafe());
        Assert.assertEquals(4L, pipelinedSubpartition2.pollBuffer().buffer().getSize());
        Assert.assertEquals(19L, createResultPartition.getSizeOfQueuedBuffersUnsafe());
        Assert.assertEquals(5L, pipelinedSubpartition.pollBuffer().buffer().getSize());
        Assert.assertEquals(14L, createResultPartition.getSizeOfQueuedBuffersUnsafe());
        Assert.assertEquals(7L, pipelinedSubpartition.pollBuffer().buffer().getSize());
        Assert.assertEquals(7L, createResultPartition.getSizeOfQueuedBuffersUnsafe());
        Assert.assertEquals(7L, pipelinedSubpartition2.pollBuffer().buffer().getSize());
        Assert.assertEquals(0L, createResultPartition.getSizeOfQueuedBuffersUnsafe());
    }
}
