package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.disk.FileChannelManager;
import org.apache.flink.runtime.io.disk.NoOpFileChannelManager;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition;
import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
import org.apache.flink.runtime.io.network.partition.InputGateFairnessTest;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateTest;
import org.apache.flink.runtime.io.network.util.TestPartitionProducer;
import org.apache.flink.runtime.io.network.util.TestProducerSource;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.CheckedSupplier;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.class */
public class LocalInputChannelTest {

    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest$TestLocalInputChannelConsumer.class */
    private static class TestLocalInputChannelConsumer implements Callable<Void> {
        private final SingleInputGate inputGate;
        private final int numberOfInputChannels;
        private final int numberOfExpectedBuffersPerChannel;

        public TestLocalInputChannelConsumer(int i, int i2, int i3, BufferPool bufferPool, ResultPartitionManager resultPartitionManager, TaskEventDispatcher taskEventDispatcher, ResultPartitionID[] resultPartitionIDArr) throws IOException, InterruptedException {
            Preconditions.checkArgument(i2 >= 1);
            Preconditions.checkArgument(i3 >= 1);
            this.inputGate = new SingleInputGateBuilder().setConsumedSubpartitionIndex(i).setNumberOfChannels(i2).setBufferPoolFactory(bufferPool).build();
            InputChannel[] inputChannelArr = new InputChannel[i2];
            for (int i4 = 0; i4 < i2; i4++) {
                inputChannelArr[i4] = InputChannelBuilder.newBuilder().setChannelIndex(i4).setPartitionManager(resultPartitionManager).setPartitionId(resultPartitionIDArr[i4]).setTaskEventPublisher(taskEventDispatcher).buildLocalChannel(this.inputGate);
            }
            InputGateFairnessTest.setupInputGate(this.inputGate, inputChannelArr);
            this.numberOfInputChannels = i2;
            this.numberOfExpectedBuffersPerChannel = i3;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Void call() throws Exception {
            int[] iArr = new int[this.numberOfInputChannels];
            while (true) {
                try {
                    Optional next = this.inputGate.getNext();
                    if (!next.isPresent()) {
                        for (int i = 0; i < iArr.length; i++) {
                            int i2 = iArr[i];
                            if (i2 != this.numberOfExpectedBuffersPerChannel) {
                                throw new IllegalStateException("Received unexpected number of buffers on channel " + i + " (" + i2 + " instead of " + this.numberOfExpectedBuffersPerChannel + ").");
                            }
                        }
                        return null;
                    }
                    if (((BufferOrEvent) next.get()).isBuffer()) {
                        ((BufferOrEvent) next.get()).getBuffer().recycleBuffer();
                        int inputChannelIdx = ((BufferOrEvent) next.get()).getChannelInfo().getInputChannelIdx();
                        int i3 = iArr[inputChannelIdx] + 1;
                        iArr[inputChannelIdx] = i3;
                        if (i3 > this.numberOfExpectedBuffersPerChannel) {
                            throw new IllegalStateException("Received more buffers than expected on channel " + ((BufferOrEvent) next.get()).getChannelInfo() + ".");
                        }
                    }
                } finally {
                    this.inputGate.close();
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest$TestPartitionProducerBufferSource.class */
    private static class TestPartitionProducerBufferSource implements TestProducerSource {
        private final int bufferSize;
        private final List<Byte> channelIndexes;

        public TestPartitionProducerBufferSource(int i, int i2, int i3) {
            this.bufferSize = i2;
            this.channelIndexes = Lists.newArrayListWithCapacity(i * i3);
            byte b = 0;
            while (true) {
                byte b2 = b;
                if (b2 >= i) {
                    Collections.shuffle(this.channelIndexes);
                    return;
                }
                for (int i4 = 0; i4 < i3; i4++) {
                    this.channelIndexes.add(Byte.valueOf(b2));
                }
                b = (byte) (b2 + 1);
            }
        }

        @Override // org.apache.flink.runtime.io.network.util.TestProducerSource
        public TestProducerSource.BufferAndChannel getNextBuffer() throws Exception {
            if (this.channelIndexes.size() <= 0) {
                return null;
            }
            return new TestProducerSource.BufferAndChannel(new byte[this.bufferSize], this.channelIndexes.remove(0).byteValue());
        }
    }

    @Test
    public void testNoDataPersistedAfterReceivingAlignedBarrier() throws Exception {
        CheckpointBarrier checkpointBarrier = new CheckpointBarrier(1L, 0L, CheckpointOptions.alignedWithTimeout(CheckpointStorageLocationReference.getDefault(), 123L));
        MemorySegment memorySegment = EventSerializer.toBuffer(checkpointBarrier, false).getMemorySegment();
        BufferConsumer bufferConsumer = new BufferConsumer(new NetworkBuffer(memorySegment, FreeingBufferRecycler.INSTANCE, Buffer.DataType.EVENT_BUFFER), memorySegment.size());
        BufferConsumer createFilledFinishedBufferConsumer = BufferBuilderTestUtils.createFilledFinishedBufferConsumer(1);
        RecordingChannelStateWriter recordingChannelStateWriter = new RecordingChannelStateWriter();
        LocalInputChannel buildLocalChannel = InputChannelBuilder.newBuilder().setPartitionManager(new SingleInputGateTest.TestingResultPartitionManager(createResultSubpartitionView(bufferConsumer, createFilledFinishedBufferConsumer))).setStateWriter(recordingChannelStateWriter).buildLocalChannel(new SingleInputGateBuilder().build());
        buildLocalChannel.requestSubpartition(0);
        buildLocalChannel.getNextBuffer();
        recordingChannelStateWriter.start(checkpointBarrier.getId(), checkpointBarrier.getCheckpointOptions());
        buildLocalChannel.checkpointStarted(checkpointBarrier);
        buildLocalChannel.getNextBuffer();
        Assert.assertTrue("no data should be persisted after receiving a barrier", recordingChannelStateWriter.getAddedInput().isEmpty());
    }

    @Test
    public void testConcurrentConsumeMultiplePartitions() throws Exception {
        Preconditions.checkArgument(true);
        Preconditions.checkArgument(true);
        Preconditions.checkArgument(true);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(64);
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(2080, 32768);
        ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
        ResultPartitionID[] resultPartitionIDArr = new ResultPartitionID[32];
        TestPartitionProducer[] testPartitionProducerArr = new TestPartitionProducer[32];
        for (int i = 0; i < 32; i++) {
            resultPartitionIDArr[i] = new ResultPartitionID();
            BufferWritingResultPartition build = new ResultPartitionBuilder().setResultPartitionId(resultPartitionIDArr[i]).setNumberOfSubpartitions(32).setNumTargetKeyGroups(32).setResultPartitionManager(resultPartitionManager).setBufferPoolFactory(() -> {
                return networkBufferPool.createBufferPool(33, 33, 32, Integer.MAX_VALUE);
            }).build();
            build.setup();
            testPartitionProducerArr[i] = new TestPartitionProducer(build, false, new TestPartitionProducerBufferSource(32, 32768, 1024));
        }
        try {
            ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(33);
            for (int i2 = 0; i2 < 32; i2++) {
                TestPartitionProducer testPartitionProducer = testPartitionProducerArr[i2];
                testPartitionProducer.getClass();
                newArrayListWithCapacity.add(CompletableFuture.supplyAsync(CheckedSupplier.unchecked(testPartitionProducer::call), newFixedThreadPool));
            }
            for (int i3 = 0; i3 < 32; i3++) {
                TestLocalInputChannelConsumer testLocalInputChannelConsumer = new TestLocalInputChannelConsumer(i3, 32, 1024, networkBufferPool.createBufferPool(32, 32), resultPartitionManager, new TaskEventDispatcher(), resultPartitionIDArr);
                testLocalInputChannelConsumer.getClass();
                newArrayListWithCapacity.add(CompletableFuture.supplyAsync(CheckedSupplier.unchecked(testLocalInputChannelConsumer::call), newFixedThreadPool));
            }
            FutureUtils.waitForAll(newArrayListWithCapacity).get();
            networkBufferPool.destroyAllBufferPools();
            networkBufferPool.destroy();
            newFixedThreadPool.shutdown();
        } catch (Throwable th) {
            networkBufferPool.destroyAllBufferPools();
            networkBufferPool.destroy();
            newFixedThreadPool.shutdown();
            throw th;
        }
    }

    @Test
    public void testPartitionRequestExponentialBackoff() throws Exception {
        SingleInputGate singleInputGate = (SingleInputGate) Mockito.mock(SingleInputGate.class);
        Mockito.when(singleInputGate.getBufferProvider()).thenReturn((BufferProvider) Mockito.mock(BufferProvider.class));
        ResultPartitionManager resultPartitionManager = (ResultPartitionManager) Mockito.mock(ResultPartitionManager.class);
        LocalInputChannel createLocalInputChannel = InputChannelTestUtils.createLocalInputChannel(singleInputGate, resultPartitionManager, 500, 3000);
        Mockito.when(resultPartitionManager.createSubpartitionView((ResultPartitionID) Matchers.eq(createLocalInputChannel.partitionId), Matchers.eq(0), (BufferAvailabilityListener) Matchers.any(BufferAvailabilityListener.class))).thenThrow(new Throwable[]{new PartitionNotFoundException(createLocalInputChannel.partitionId)});
        Timer timer = (Timer) Mockito.mock(Timer.class);
        ((Timer) Mockito.doAnswer(new Answer<Void>() { // from class: org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannelTest.1
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Void m144answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((TimerTask) invocationOnMock.getArguments()[0]).run();
                return null;
            }
        }).when(timer)).schedule((TimerTask) Matchers.any(TimerTask.class), Matchers.anyLong());
        createLocalInputChannel.requestSubpartition(0);
        ((ResultPartitionManager) Mockito.verify(resultPartitionManager)).createSubpartitionView((ResultPartitionID) Matchers.eq(createLocalInputChannel.partitionId), Matchers.eq(0), (BufferAvailabilityListener) Matchers.any(BufferAvailabilityListener.class));
        for (long j : new int[]{500, 1000, 2000, 3000}) {
            createLocalInputChannel.retriggerSubpartitionRequest(timer, 0);
            ((Timer) Mockito.verify(timer)).schedule((TimerTask) Matchers.any(TimerTask.class), Matchers.eq(j));
        }
        try {
            createLocalInputChannel.retriggerSubpartitionRequest(timer, 0);
            createLocalInputChannel.getNextBuffer();
            Assert.fail("Did not throw expected exception.");
        } catch (Exception e) {
        }
    }

    @Test(expected = CancelTaskException.class)
    public void testProducerFailedException() throws Exception {
        ResultSubpartitionView resultSubpartitionView = (ResultSubpartitionView) Mockito.mock(ResultSubpartitionView.class);
        Mockito.when(Boolean.valueOf(resultSubpartitionView.isReleased())).thenReturn(true);
        Mockito.when(resultSubpartitionView.getFailureCause()).thenReturn(new Exception("Expected test exception"));
        ResultPartitionManager resultPartitionManager = (ResultPartitionManager) Mockito.mock(ResultPartitionManager.class);
        Mockito.when(resultPartitionManager.createSubpartitionView((ResultPartitionID) Matchers.any(ResultPartitionID.class), Matchers.anyInt(), (BufferAvailabilityListener) Matchers.any(BufferAvailabilityListener.class))).thenReturn(resultSubpartitionView);
        SingleInputGate singleInputGate = (SingleInputGate) Mockito.mock(SingleInputGate.class);
        Mockito.when(singleInputGate.getBufferProvider()).thenReturn((BufferProvider) Mockito.mock(BufferProvider.class));
        LocalInputChannel createLocalInputChannel = InputChannelTestUtils.createLocalInputChannel(singleInputGate, resultPartitionManager);
        createLocalInputChannel.requestSubpartition(0);
        createLocalInputChannel.getNextBuffer();
    }

    @Test
    public void testPartitionNotFoundExceptionWhileRequestingPartition() throws Exception {
        LocalInputChannel createLocalInputChannel = InputChannelTestUtils.createLocalInputChannel(InputChannelTestUtils.createSingleInputGate(1), new ResultPartitionManager());
        try {
            createLocalInputChannel.requestSubpartition(0);
            Assert.fail("Should throw a PartitionNotFoundException.");
        } catch (PartitionNotFoundException e) {
            Assert.assertThat(createLocalInputChannel.getPartitionId(), org.hamcrest.Matchers.is(e.getPartitionId()));
        }
    }

    @Test
    public void testRetriggerPartitionRequestWhilePartitionNotFound() throws Exception {
        SingleInputGate createSingleInputGate = InputChannelTestUtils.createSingleInputGate(1);
        InputChannel createLocalInputChannel = InputChannelTestUtils.createLocalInputChannel(createSingleInputGate, new ResultPartitionManager(), 1, 1);
        createSingleInputGate.setInputChannels(new InputChannel[]{createLocalInputChannel});
        createLocalInputChannel.requestSubpartition(0);
        Assert.assertNotNull(createSingleInputGate.getRetriggerLocalRequestTimer());
    }

    @Test
    public void testChannelErrorWhileRetriggeringRequest() {
        final LocalInputChannel createLocalInputChannel = InputChannelTestUtils.createLocalInputChannel(InputChannelTestUtils.createSingleInputGate(1), new ResultPartitionManager());
        Timer timer = new Timer(true) { // from class: org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannelTest.2
            @Override // java.util.Timer
            public void schedule(TimerTask timerTask, long j) {
                timerTask.run();
                try {
                    createLocalInputChannel.checkError();
                    Assert.fail("Should throw a PartitionNotFoundException.");
                } catch (PartitionNotFoundException e) {
                    Assert.assertThat(createLocalInputChannel.partitionId, org.hamcrest.Matchers.is(e.getPartitionId()));
                } catch (IOException e2) {
                    Assert.fail("Should throw a PartitionNotFoundException.");
                }
            }
        };
        try {
            createLocalInputChannel.retriggerSubpartitionRequest(timer, 0);
            timer.cancel();
        } catch (Throwable th) {
            timer.cancel();
            throw th;
        }
    }

    @Test
    public void testConcurrentReleaseAndRetriggerPartitionRequest() throws Exception {
        final SingleInputGate createSingleInputGate = InputChannelTestUtils.createSingleInputGate(1);
        ResultPartitionManager resultPartitionManager = (ResultPartitionManager) Mockito.mock(ResultPartitionManager.class);
        Mockito.when(resultPartitionManager.createSubpartitionView((ResultPartitionID) Matchers.any(ResultPartitionID.class), Matchers.anyInt(), (BufferAvailabilityListener) Matchers.any(BufferAvailabilityListener.class))).thenAnswer(new Answer<ResultSubpartitionView>() { // from class: org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannelTest.3
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public ResultSubpartitionView m145answer(InvocationOnMock invocationOnMock) throws Throwable {
                Thread.sleep(100L);
                throw new PartitionNotFoundException(new ResultPartitionID());
            }
        });
        final LocalInputChannel createLocalInputChannel = InputChannelTestUtils.createLocalInputChannel(createSingleInputGate, resultPartitionManager, 1, 1);
        Thread thread = new Thread() { // from class: org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannelTest.4
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    createSingleInputGate.close();
                } catch (IOException e) {
                }
            }
        };
        Thread thread2 = new Thread() { // from class: org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannelTest.5
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    createLocalInputChannel.requestSubpartition(0);
                } catch (IOException e) {
                }
            }
        };
        thread2.start();
        thread.start();
        thread.join();
        thread2.join();
    }

    @Test
    public void testGetNextAfterPartitionReleased() throws Exception {
        ResultSubpartitionView createResultSubpartitionView = createResultSubpartitionView(false);
        LocalInputChannel createLocalInputChannel = InputChannelTestUtils.createLocalInputChannel(new SingleInputGateBuilder().build(), new SingleInputGateTest.TestingResultPartitionManager(createResultSubpartitionView));
        createLocalInputChannel.requestSubpartition(0);
        Assert.assertFalse(createLocalInputChannel.getNextBuffer().isPresent());
        createResultSubpartitionView.releaseAllResources();
        try {
            createLocalInputChannel.getNextBuffer();
            Assert.fail("Did not throw expected CancelTaskException");
        } catch (CancelTaskException e) {
        }
        createLocalInputChannel.releaseAllResources();
        Assert.assertFalse(createLocalInputChannel.getNextBuffer().isPresent());
    }

    @Test
    public void testGetBufferFromLocalChannelWhenCompressionEnabled() throws Exception {
        LocalInputChannel createLocalInputChannel = InputChannelTestUtils.createLocalInputChannel(new SingleInputGateBuilder().build(), new SingleInputGateTest.TestingResultPartitionManager(createResultSubpartitionView(true)));
        createLocalInputChannel.requestSubpartition(0);
        Optional nextBuffer = createLocalInputChannel.getNextBuffer();
        Assert.assertTrue(nextBuffer.isPresent());
        Assert.assertFalse(((InputChannel.BufferAndAvailability) nextBuffer.get()).buffer().isCompressed());
    }

    @Test(expected = IllegalStateException.class)
    public void testUnblockReleasedChannel() throws Exception {
        LocalInputChannel createLocalInputChannel = InputChannelTestUtils.createLocalInputChannel(InputChannelTestUtils.createSingleInputGate(1), new ResultPartitionManager());
        createLocalInputChannel.releaseAllResources();
        createLocalInputChannel.resumeConsumption();
    }

    @Test
    public void testEnqueueAvailableChannelWhenResuming() throws IOException, InterruptedException {
        ResultSubpartition resultSubpartition = PartitionTestUtils.createPartition(ResultPartitionType.PIPELINED, NoOpFileChannelManager.INSTANCE).getAllPartitions()[0];
        LocalInputChannel createLocalInputChannel = InputChannelTestUtils.createLocalInputChannel(new SingleInputGateBuilder().build(), new SingleInputGateTest.TestingResultPartitionManager(resultSubpartition.createReadView(() -> {
        })));
        createLocalInputChannel.requestSubpartition(0);
        resultSubpartition.add(EventSerializer.toBufferConsumer(new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), false));
        Assert.assertTrue(createLocalInputChannel.getNextBuffer().isPresent());
        resultSubpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4096));
        resultSubpartition.flush();
        Assert.assertFalse(createLocalInputChannel.inputGate.pollNext().isPresent());
        createLocalInputChannel.resumeConsumption();
        Optional pollNext = createLocalInputChannel.inputGate.pollNext();
        Assert.assertTrue(pollNext.isPresent());
        Assert.assertTrue(((BufferOrEvent) pollNext.get()).isBuffer());
    }

    @Test
    public void testCheckpointingInflightData() throws Exception {
        SingleInputGate build = new SingleInputGateBuilder().build();
        ResultSubpartition resultSubpartition = PartitionTestUtils.createPartition(ResultPartitionType.PIPELINED, NoOpFileChannelManager.INSTANCE).getAllPartitions()[0];
        SingleInputGateTest.TestingResultPartitionManager testingResultPartitionManager = new SingleInputGateTest.TestingResultPartitionManager(resultSubpartition.createReadView(() -> {
        }));
        RecordingChannelStateWriter recordingChannelStateWriter = new RecordingChannelStateWriter();
        InputChannel createLocalInputChannel = InputChannelTestUtils.createLocalInputChannel(build, testingResultPartitionManager, 0, 0, inputChannelBuilder -> {
            inputChannelBuilder.setStateWriter(recordingChannelStateWriter);
        });
        build.setInputChannels(new InputChannel[]{createLocalInputChannel});
        createLocalInputChannel.requestSubpartition(0);
        CheckpointOptions unaligned = CheckpointOptions.unaligned(CheckpointStorageLocationReference.getDefault());
        recordingChannelStateWriter.start(0L, unaligned);
        CheckpointBarrier checkpointBarrier = new CheckpointBarrier(0L, 123L, unaligned);
        createLocalInputChannel.checkpointStarted(checkpointBarrier);
        resultSubpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(1));
        Assert.assertTrue(createLocalInputChannel.getNextBuffer().isPresent());
        resultSubpartition.add(EventSerializer.toBufferConsumer(checkpointBarrier, true));
        Assert.assertTrue(createLocalInputChannel.getNextBuffer().isPresent());
        resultSubpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(2));
        Assert.assertTrue(createLocalInputChannel.getNextBuffer().isPresent());
        Assert.assertArrayEquals(recordingChannelStateWriter.getAddedInput().get(createLocalInputChannel.getChannelInfo()).stream().mapToInt((v0) -> {
            return v0.getSize();
        }).toArray(), new int[]{1});
    }

    private static ResultSubpartitionView createResultSubpartitionView(boolean z) throws IOException {
        return z ? createResultSubpartitionView(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4096)) : createResultSubpartitionView(new BufferConsumer[0]);
    }

    private static ResultSubpartitionView createResultSubpartitionView(BufferConsumer... bufferConsumerArr) throws IOException {
        ResultSubpartition resultSubpartition = PartitionTestUtils.createPartition(ResultPartitionType.PIPELINED, (FileChannelManager) NoOpFileChannelManager.INSTANCE, true, 4096).getAllPartitions()[0];
        for (BufferConsumer bufferConsumer : bufferConsumerArr) {
            resultSubpartition.add(bufferConsumer);
        }
        return resultSubpartition.createReadView(() -> {
        });
    }
}
