/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.memory.MemorySegmentProvider;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.PartitionRequestClient;
import org.apache.flink.runtime.io.network.TestingConnectionManager;
import org.apache.flink.runtime.io.network.TestingPartitionRequestClient;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferListener;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.buffer.NoOpBufferPool;
import org.apache.flink.runtime.io.network.partition.AvailabilityUtil;
import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
import org.apache.flink.runtime.io.network.partition.ProducerFailedException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TestTaskBuilder;
import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class RemoteInputChannelTest {
    private static final long CHECKPOINT_ID = 1L;
    private static final CheckpointOptions UNALIGNED = CheckpointOptions.unaligned((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault());
    private static final CheckpointOptions ALIGNED_WITH_TIMEOUT = CheckpointOptions.alignedWithTimeout((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault(), (long)10L);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testGateNotifiedOnBarrierConversion() throws IOException, InterruptedException {
        boolean sequenceNumber = false;
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(1, 4096);
        try {
            SingleInputGate inputGate = new SingleInputGateBuilder().setBufferPoolFactory(networkBufferPool.createBufferPool(1, 1)).build();
            inputGate.setup();
            RemoteInputChannel channel = InputChannelBuilder.newBuilder().setConnectionManager(new TestVerifyConnectionManager(new TestVerifyPartitionRequestClient())).buildRemoteChannel(inputGate);
            channel.requestSubpartition();
            channel.onBuffer(EventSerializer.toBuffer((AbstractEvent)new CheckpointBarrier(1L, 123L, CheckpointOptions.alignedWithTimeout((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault(), (long)Integer.MAX_VALUE)), (boolean)false), 0, 0);
            inputGate.pollNext();
            channel.convertToPriorityEvent(0);
            Assert.assertTrue((boolean)inputGate.getPriorityEventAvailableFuture().isDone());
        }
        finally {
            networkBufferPool.destroy();
        }
    }

    @Test
    public void testExceptionOnReordering() throws Exception {
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1);
        RemoteInputChannel inputChannel = this.createRemoteInputChannel(inputGate);
        Buffer buffer = TestBufferFactory.createBuffer(32768);
        inputChannel.onBuffer(buffer.retainBuffer(), 0, -1);
        inputChannel.onBuffer(buffer, 29, -1);
        try {
            inputChannel.getNextBuffer();
            Assert.fail((String)"Did not throw expected exception after enqueuing an out-of-order buffer.");
        }
        catch (Exception expected) {
            Assert.assertFalse((boolean)buffer.isRecycled());
            inputChannel.releaseAllResources();
            Assert.assertTrue((boolean)buffer.isRecycled());
        }
    }

    @Test
    public void testExceptionOnPersisting() throws Exception {
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1);
        RemoteInputChannel inputChannel = InputChannelBuilder.newBuilder().setStateWriter((ChannelStateWriter)new ChannelStateWriter.NoOpChannelStateWriter(){

            public void addInputData(long checkpointId, InputChannelInfo info, int startSeqNum, CloseableIterator<Buffer> data) {
                try {
                    data.close();
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
                throw new ExpectedTestException();
            }
        }).buildRemoteChannel(inputGate);
        inputChannel.checkpointStarted(new CheckpointBarrier(42L, System.currentTimeMillis(), CheckpointOptions.unaligned((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault())));
        Buffer buffer = TestBufferFactory.createBuffer(32768);
        Assert.assertFalse((boolean)buffer.isRecycled());
        try {
            inputChannel.onBuffer(buffer, 0, -1);
            Assert.fail((String)"This should have failed");
        }
        catch (ExpectedTestException expectedTestException) {
            // empty catch block
        }
        Assert.assertFalse((boolean)buffer.isRecycled());
        inputChannel.releaseAllResources();
        Assert.assertTrue((boolean)buffer.isRecycled());
    }

    @Test
    public void testConcurrentOnBufferAndRelease() throws Exception {
        this.testConcurrentReleaseAndSomething(8192, (inputChannel, buffer, j) -> {
            inputChannel.onBuffer(buffer, j.intValue(), -1);
            return true;
        });
    }

    @Test
    public void testConcurrentNotifyBufferAvailableAndRelease() throws Exception {
        this.testConcurrentReleaseAndSomething(1024, (inputChannel, buffer, j) -> inputChannel.getBufferManager().notifyBufferAvailable(buffer));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testConcurrentReleaseAndSomething(int numberOfRepetitions, TriFunction<RemoteInputChannel, Buffer, Integer, Boolean> function) throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(2);
        Buffer buffer = TestBufferFactory.createBuffer(32768);
        try {
            SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1);
            for (int i = 0; i < numberOfRepetitions; ++i) {
                RemoteInputChannel inputChannel = this.createRemoteInputChannel(inputGate);
                Callable<Void> enqueueTask = () -> {
                    do {
                        for (int j = 0; j < 128; ++j) {
                            if (((Boolean)function.apply(inputChannel, buffer.retainBuffer(), j)).booleanValue()) continue;
                            buffer.recycleBuffer();
                        }
                    } while (!inputChannel.isReleased());
                    return null;
                };
                Callable<Void> releaseTask = () -> {
                    inputChannel.releaseAllResources();
                    return null;
                };
                ArrayList results = Lists.newArrayListWithCapacity((int)2);
                results.add(executor.submit(enqueueTask));
                results.add(executor.submit(releaseTask));
                for (Future result : results) {
                    result.get();
                }
                Assert.assertEquals((String)"Resource leak during concurrent release and notifyBufferAvailable.", (long)0L, (long)inputChannel.getNumberOfQueuedBuffers());
            }
        }
        finally {
            executor.shutdown();
            Assert.assertFalse((boolean)buffer.isRecycled());
            buffer.recycleBuffer();
            Assert.assertTrue((boolean)buffer.isRecycled());
        }
    }

    @Test(expected=IllegalStateException.class)
    public void testRetriggerWithoutPartitionRequest() throws Exception {
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1);
        RemoteInputChannel ch = this.createRemoteInputChannel(inputGate, 0, 500, 3000);
        ch.retriggerSubpartitionRequest();
    }

    @Test
    public void testPartitionRequestExponentialBackoff() throws Exception {
        int[] expectedDelays = new int[]{500, 1000, 2000, 3000};
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1);
        ResultPartitionID partitionId = new ResultPartitionID();
        TestVerifyPartitionRequestClient client = new TestVerifyPartitionRequestClient();
        TestVerifyConnectionManager connectionManager = new TestVerifyConnectionManager(client);
        RemoteInputChannel ch = this.createRemoteInputChannel(inputGate, connectionManager, partitionId, 500, 3000);
        ch.requestSubpartition();
        client.verifyResult(partitionId, 0, 0);
        for (int expected : expectedDelays) {
            ch.retriggerSubpartitionRequest();
            client.verifyResult(partitionId, 0, expected);
        }
        try {
            ch.retriggerSubpartitionRequest();
            ch.getNextBuffer();
            Assert.fail((String)"Did not throw expected exception.");
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void testPartitionRequestSingleBackoff() throws Exception {
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1);
        ResultPartitionID partitionId = new ResultPartitionID();
        TestVerifyPartitionRequestClient client = new TestVerifyPartitionRequestClient();
        TestVerifyConnectionManager connectionManager = new TestVerifyConnectionManager(client);
        RemoteInputChannel ch = this.createRemoteInputChannel(inputGate, connectionManager, partitionId, 500, 500);
        ch.requestSubpartition();
        client.verifyResult(partitionId, 0, 0);
        ch.retriggerSubpartitionRequest();
        client.verifyResult(partitionId, 0, 500);
        try {
            ch.retriggerSubpartitionRequest();
            ch.getNextBuffer();
            Assert.fail((String)"Did not throw expected exception.");
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void testPartitionRequestNoBackoff() throws Exception {
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1);
        ResultPartitionID partitionId = new ResultPartitionID();
        TestVerifyPartitionRequestClient client = new TestVerifyPartitionRequestClient();
        TestVerifyConnectionManager connectionManager = new TestVerifyConnectionManager(client);
        RemoteInputChannel ch = this.createRemoteInputChannel(inputGate, connectionManager, partitionId, 0, 0);
        ch.requestSubpartition();
        client.verifyResult(partitionId, 0, 0);
        try {
            ch.retriggerSubpartitionRequest();
            ch.getNextBuffer();
            Assert.fail((String)"Did not throw expected exception.");
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void testOnFailedPartitionRequest() throws Exception {
        ResultPartitionID partitionId = new ResultPartitionID();
        TestPartitionProducerStateProvider provider = new TestPartitionProducerStateProvider(partitionId);
        SingleInputGate inputGate = new SingleInputGateBuilder().setPartitionProducerStateProvider(provider).build();
        RemoteInputChannel ch = InputChannelBuilder.newBuilder().setPartitionId(partitionId).buildRemoteChannel(inputGate);
        ch.onFailedPartitionRequest();
        Assert.assertTrue((boolean)provider.isInvoked());
    }

    @Test(expected=CancelTaskException.class)
    public void testProducerFailedException() throws Exception {
        ConnectionManager connManager = (ConnectionManager)Mockito.mock(ConnectionManager.class);
        Mockito.when((Object)connManager.createPartitionRequestClient((ConnectionID)Matchers.any(ConnectionID.class))).thenReturn(Mockito.mock(PartitionRequestClient.class));
        SingleInputGate gate = InputChannelTestUtils.createSingleInputGate(1);
        RemoteInputChannel ch = InputChannelTestUtils.createRemoteInputChannel(gate, 0, connManager);
        ch.onError((Throwable)new ProducerFailedException((Throwable)new RuntimeException("Expected test exception.")));
        ch.requestSubpartition();
        ch.getNextBuffer();
    }

    @Test(expected=PartitionConnectionException.class)
    public void testPartitionConnectionException() throws IOException {
        TestingExceptionConnectionManager connManager = new TestingExceptionConnectionManager();
        SingleInputGate gate = InputChannelTestUtils.createSingleInputGate(1);
        RemoteInputChannel ch = InputChannelTestUtils.createRemoteInputChannel(gate, 0, connManager);
        gate.setInputChannels(new InputChannel[]{ch});
        gate.requestPartitions();
        ch.getNextBuffer();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAvailableBuffersLessThanRequiredBuffers() throws Exception {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(16, 32);
        int numFloatingBuffers = 14;
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1, (MemorySegmentProvider)networkBufferPool);
        RemoteInputChannel inputChannel = this.createRemoteInputChannel(inputGate);
        inputGate.setInputChannels(new InputChannel[]{inputChannel});
        Throwable thrown = null;
        try {
            BufferPool bufferPool = (BufferPool)Mockito.spy((Object)networkBufferPool.createBufferPool(14, 14));
            inputGate.setBufferPool(bufferPool);
            inputGate.setupChannels();
            inputChannel.requestSubpartition();
            Buffer exclusiveBuffer = inputChannel.requestBuffer();
            Assert.assertNotNull((Object)exclusiveBuffer);
            int numRecycleFloatingBuffers = 2;
            ArrayDeque<Buffer> floatingBufferQueue = new ArrayDeque<Buffer>(2);
            for (int i = 0; i < 2; ++i) {
                Buffer floatingBuffer = bufferPool.requestBuffer();
                Assert.assertNotNull((Object)floatingBuffer);
                floatingBufferQueue.add(floatingBuffer);
            }
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)2))).requestBuffer();
            inputChannel.onSenderBacklog(14);
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)15))).requestBuffer();
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)1))).addBufferListener((BufferListener)inputChannel.getBufferManager());
            Assert.assertEquals((String)"There should be 13 buffers available in the channel", (long)13L, (long)inputChannel.getNumberOfAvailableBuffers());
            Assert.assertEquals((String)"There should be 16 buffers required in the channel", (long)16L, (long)inputChannel.getNumberOfRequiredBuffers());
            Assert.assertEquals((String)"There should be 0 buffers available in local pool", (long)0L, (long)bufferPool.getNumberOfAvailableMemorySegments());
            Assert.assertTrue((boolean)inputChannel.isWaitingForFloatingBuffers());
            inputChannel.onSenderBacklog(16);
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)15))).requestBuffer();
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)1))).addBufferListener((BufferListener)inputChannel.getBufferManager());
            Assert.assertEquals((String)"There should be 13 buffers available in the channel", (long)13L, (long)inputChannel.getNumberOfAvailableBuffers());
            Assert.assertEquals((String)"There should be 18 buffers required in the channel", (long)18L, (long)inputChannel.getNumberOfRequiredBuffers());
            Assert.assertEquals((String)"There should be 0 buffers available in local pool", (long)0L, (long)bufferPool.getNumberOfAvailableMemorySegments());
            Assert.assertTrue((boolean)inputChannel.isWaitingForFloatingBuffers());
            exclusiveBuffer.recycleBuffer();
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)15))).requestBuffer();
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)1))).addBufferListener((BufferListener)inputChannel.getBufferManager());
            Assert.assertEquals((String)"There should be 14 buffers available in the channel", (long)14L, (long)inputChannel.getNumberOfAvailableBuffers());
            Assert.assertEquals((String)"There should be 18 buffers required in the channel", (long)18L, (long)inputChannel.getNumberOfRequiredBuffers());
            Assert.assertEquals((String)"There should be 0 buffers available in local pool", (long)0L, (long)bufferPool.getNumberOfAvailableMemorySegments());
            Assert.assertTrue((boolean)inputChannel.isWaitingForFloatingBuffers());
            ((Buffer)floatingBufferQueue.poll()).recycleBuffer();
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)16))).requestBuffer();
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)2))).addBufferListener((BufferListener)inputChannel.getBufferManager());
            Assert.assertEquals((String)"There should be 15 buffers available in the channel", (long)15L, (long)inputChannel.getNumberOfAvailableBuffers());
            Assert.assertEquals((String)"There should be 18 buffers required in the channel", (long)18L, (long)inputChannel.getNumberOfRequiredBuffers());
            Assert.assertEquals((String)"There should be 0 buffers available in local pool", (long)0L, (long)bufferPool.getNumberOfAvailableMemorySegments());
            Assert.assertTrue((boolean)inputChannel.isWaitingForFloatingBuffers());
            inputChannel.onSenderBacklog(13);
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)16))).requestBuffer();
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)2))).addBufferListener((BufferListener)inputChannel.getBufferManager());
            Assert.assertEquals((String)"There should be 15 buffers available in the channel", (long)15L, (long)inputChannel.getNumberOfAvailableBuffers());
            Assert.assertEquals((String)"There should be 15 buffers required in the channel", (long)15L, (long)inputChannel.getNumberOfRequiredBuffers());
            Assert.assertEquals((String)"There should be 0 buffers available in local pool", (long)0L, (long)bufferPool.getNumberOfAvailableMemorySegments());
            Assert.assertTrue((boolean)inputChannel.isWaitingForFloatingBuffers());
            ((Buffer)floatingBufferQueue.poll()).recycleBuffer();
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)16))).requestBuffer();
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)2))).addBufferListener((BufferListener)inputChannel.getBufferManager());
            Assert.assertEquals((String)"There should be 15 buffers available in the channel", (long)15L, (long)inputChannel.getNumberOfAvailableBuffers());
            Assert.assertEquals((String)"There should be 15 buffers required in the channel", (long)15L, (long)inputChannel.getNumberOfRequiredBuffers());
            Assert.assertEquals((String)"There should be 1 buffers available in local pool", (long)1L, (long)bufferPool.getNumberOfAvailableMemorySegments());
            Assert.assertFalse((boolean)inputChannel.isWaitingForFloatingBuffers());
            inputChannel.onSenderBacklog(15);
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)18))).requestBuffer();
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)3))).addBufferListener((BufferListener)inputChannel.getBufferManager());
            Assert.assertEquals((String)"There should be 16 buffers available in the channel", (long)16L, (long)inputChannel.getNumberOfAvailableBuffers());
            Assert.assertEquals((String)"There should be 17 buffers required in the channel", (long)17L, (long)inputChannel.getNumberOfRequiredBuffers());
            Assert.assertEquals((String)"There should be 0 buffers available in local pool", (long)0L, (long)bufferPool.getNumberOfAvailableMemorySegments());
            Assert.assertTrue((boolean)inputChannel.isWaitingForFloatingBuffers());
        }
        catch (Throwable t) {
            try {
                thrown = t;
            }
            catch (Throwable throwable) {
                RemoteInputChannelTest.cleanup(networkBufferPool, null, null, thrown, new InputChannel[]{inputChannel});
                throw throwable;
            }
            RemoteInputChannelTest.cleanup(networkBufferPool, null, null, thrown, new InputChannel[]{inputChannel});
        }
        RemoteInputChannelTest.cleanup(networkBufferPool, null, null, thrown, new InputChannel[]{inputChannel});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAvailableBuffersEqualToRequiredBuffers() throws Exception {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(16, 32);
        int numFloatingBuffers = 14;
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1, (MemorySegmentProvider)networkBufferPool);
        RemoteInputChannel inputChannel = this.createRemoteInputChannel(inputGate);
        inputGate.setInputChannels(new InputChannel[]{inputChannel});
        Throwable thrown = null;
        try {
            BufferPool bufferPool = (BufferPool)Mockito.spy((Object)networkBufferPool.createBufferPool(14, 14));
            inputGate.setBufferPool(bufferPool);
            inputGate.setupChannels();
            inputChannel.requestSubpartition();
            Buffer exclusiveBuffer = inputChannel.requestBuffer();
            Assert.assertNotNull((Object)exclusiveBuffer);
            Buffer floatingBuffer = bufferPool.requestBuffer();
            Assert.assertNotNull((Object)floatingBuffer);
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)1))).requestBuffer();
            inputChannel.onSenderBacklog(12);
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)14))).requestBuffer();
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)0))).addBufferListener((BufferListener)inputChannel.getBufferManager());
            Assert.assertEquals((String)"There should be 14 buffers available in the channel", (long)14L, (long)inputChannel.getNumberOfAvailableBuffers());
            Assert.assertEquals((String)"There should be 14 buffers required in the channel", (long)14L, (long)inputChannel.getNumberOfRequiredBuffers());
            Assert.assertEquals((String)"There should be 0 buffers available in local pool", (long)0L, (long)bufferPool.getNumberOfAvailableMemorySegments());
            floatingBuffer.recycleBuffer();
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)14))).requestBuffer();
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)0))).addBufferListener((BufferListener)inputChannel.getBufferManager());
            Assert.assertEquals((String)"There should be 14 buffers available in the channel", (long)14L, (long)inputChannel.getNumberOfAvailableBuffers());
            Assert.assertEquals((String)"There should be 14 buffers required in the channel", (long)14L, (long)inputChannel.getNumberOfRequiredBuffers());
            Assert.assertEquals((String)"There should be 1 buffer available in local pool", (long)1L, (long)bufferPool.getNumberOfAvailableMemorySegments());
            exclusiveBuffer.recycleBuffer();
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)14))).requestBuffer();
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)0))).addBufferListener((BufferListener)inputChannel.getBufferManager());
            Assert.assertEquals((String)"There should be 14 buffers available in the channel", (long)14L, (long)inputChannel.getNumberOfAvailableBuffers());
            Assert.assertEquals((String)"There should be 14 buffers required in the channel", (long)14L, (long)inputChannel.getNumberOfRequiredBuffers());
            Assert.assertEquals((String)"There should be 2 buffers available in local pool", (long)2L, (long)bufferPool.getNumberOfAvailableMemorySegments());
        }
        catch (Throwable t) {
            try {
                thrown = t;
            }
            catch (Throwable throwable) {
                RemoteInputChannelTest.cleanup(networkBufferPool, null, null, thrown, new InputChannel[]{inputChannel});
                throw throwable;
            }
            RemoteInputChannelTest.cleanup(networkBufferPool, null, null, thrown, new InputChannel[]{inputChannel});
        }
        RemoteInputChannelTest.cleanup(networkBufferPool, null, null, thrown, new InputChannel[]{inputChannel});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAvailableBuffersMoreThanRequiredBuffers() throws Exception {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(16, 32);
        int numFloatingBuffers = 14;
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1, (MemorySegmentProvider)networkBufferPool);
        RemoteInputChannel inputChannel = this.createRemoteInputChannel(inputGate);
        inputGate.setInputChannels(new InputChannel[]{inputChannel});
        Throwable thrown = null;
        try {
            BufferPool bufferPool = (BufferPool)Mockito.spy((Object)networkBufferPool.createBufferPool(14, 14));
            inputGate.setBufferPool(bufferPool);
            inputGate.setupChannels();
            inputChannel.requestSubpartition();
            Buffer exclusiveBuffer = inputChannel.requestBuffer();
            Assert.assertNotNull((Object)exclusiveBuffer);
            Buffer floatingBuffer = bufferPool.requestBuffer();
            Assert.assertNotNull((Object)floatingBuffer);
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)1))).requestBuffer();
            inputChannel.onSenderBacklog(12);
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)14))).requestBuffer();
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)0))).addBufferListener((BufferListener)inputChannel.getBufferManager());
            Assert.assertEquals((String)"There should be 14 buffers available in the channel", (long)14L, (long)inputChannel.getNumberOfAvailableBuffers());
            Assert.assertEquals((String)"There should be 14 buffers required in the channel", (long)14L, (long)inputChannel.getNumberOfRequiredBuffers());
            Assert.assertEquals((String)"There should be 0 buffers available in local pool", (long)0L, (long)bufferPool.getNumberOfAvailableMemorySegments());
            inputChannel.onSenderBacklog(10);
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)14))).requestBuffer();
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)0))).addBufferListener((BufferListener)inputChannel.getBufferManager());
            Assert.assertEquals((String)"There should be 14 buffers available in the channel", (long)14L, (long)inputChannel.getNumberOfAvailableBuffers());
            Assert.assertEquals((String)"There should be 12 buffers required in the channel", (long)12L, (long)inputChannel.getNumberOfRequiredBuffers());
            Assert.assertEquals((String)"There should be 0 buffers available in local pool", (long)0L, (long)bufferPool.getNumberOfAvailableMemorySegments());
            exclusiveBuffer.recycleBuffer();
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)14))).requestBuffer();
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)0))).addBufferListener((BufferListener)inputChannel.getBufferManager());
            Assert.assertEquals((String)"There should be 14 buffers available in the channel", (long)14L, (long)inputChannel.getNumberOfAvailableBuffers());
            Assert.assertEquals((String)"There should be 12 buffers required in the channel", (long)12L, (long)inputChannel.getNumberOfRequiredBuffers());
            Assert.assertEquals((String)"There should be 1 buffer available in local pool", (long)1L, (long)bufferPool.getNumberOfAvailableMemorySegments());
            floatingBuffer.recycleBuffer();
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)14))).requestBuffer();
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)0))).addBufferListener((BufferListener)inputChannel.getBufferManager());
            Assert.assertEquals((String)"There should be 14 buffers available in the channel", (long)14L, (long)inputChannel.getNumberOfAvailableBuffers());
            Assert.assertEquals((String)"There should be 12 buffers required in the channel", (long)12L, (long)inputChannel.getNumberOfRequiredBuffers());
            Assert.assertEquals((String)"There should be 2 buffers available in local pool", (long)2L, (long)bufferPool.getNumberOfAvailableMemorySegments());
        }
        catch (Throwable t) {
            try {
                thrown = t;
            }
            catch (Throwable throwable) {
                RemoteInputChannelTest.cleanup(networkBufferPool, null, null, thrown, new InputChannel[]{inputChannel});
                throw throwable;
            }
            RemoteInputChannelTest.cleanup(networkBufferPool, null, null, thrown, new InputChannel[]{inputChannel});
        }
        RemoteInputChannelTest.cleanup(networkBufferPool, null, null, thrown, new InputChannel[]{inputChannel});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testFairDistributionFloatingBuffers() throws Exception {
        int numExclusiveBuffers = 2;
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(12, 32);
        int numFloatingBuffers = 3;
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(3, (MemorySegmentProvider)networkBufferPool);
        RemoteInputChannel[] inputChannels = new RemoteInputChannel[]{this.createRemoteInputChannel(inputGate), this.createRemoteInputChannel(inputGate), this.createRemoteInputChannel(inputGate)};
        inputGate.setInputChannels((InputChannel[])inputChannels);
        Throwable thrown = null;
        try {
            BufferPool bufferPool = (BufferPool)Mockito.spy((Object)networkBufferPool.createBufferPool(3, 3));
            inputGate.setBufferPool(bufferPool);
            inputGate.setupChannels();
            inputGate.requestPartitions();
            for (RemoteInputChannel inputChannel : inputChannels) {
                inputChannel.requestSubpartition();
            }
            ArrayList<Buffer> floatingBuffers = new ArrayList<Buffer>(3);
            for (int i = 0; i < 3; ++i) {
                Buffer buffer = bufferPool.requestBuffer();
                Assert.assertNotNull((Object)buffer);
                floatingBuffers.add(buffer);
            }
            for (Object inputChannel : inputChannels) {
                inputChannel.onSenderBacklog(8);
                ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)1))).addBufferListener((BufferListener)inputChannel.getBufferManager());
                Assert.assertEquals((String)"There should be 2 buffers available in the channel", (long)2L, (long)inputChannel.getNumberOfAvailableBuffers());
            }
            for (Buffer buffer : floatingBuffers) {
                buffer.recycleBuffer();
            }
            for (Object inputChannel : inputChannels) {
                Assert.assertEquals((String)"There should be 3 buffers available in the channel", (long)3L, (long)inputChannel.getNumberOfAvailableBuffers());
                Assert.assertEquals((String)"There should be 1 unannounced credits in the channel", (long)1L, (long)inputChannel.getUnannouncedCredit());
            }
        }
        catch (Throwable t) {
            thrown = t;
        }
        finally {
            RemoteInputChannelTest.cleanup(networkBufferPool, null, null, thrown, (InputChannel[])inputChannels);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testFailureInNotifyBufferAvailable() throws Exception {
        boolean numExclusiveBuffers = true;
        boolean numFloatingBuffers = true;
        int numTotalBuffers = 2;
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(2, 32);
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1);
        RemoteInputChannel successfulRemoteIC = this.createRemoteInputChannel(inputGate);
        successfulRemoteIC.requestSubpartition();
        RemoteInputChannel failingRemoteIC = this.createRemoteInputChannel(inputGate);
        Buffer buffer = null;
        Throwable thrown = null;
        try {
            BufferPool bufferPool = networkBufferPool.createBufferPool(1, 1);
            inputGate.setBufferPool(bufferPool);
            buffer = (Buffer)Preconditions.checkNotNull((Object)bufferPool.requestBuffer());
            failingRemoteIC.onSenderBacklog(1);
            successfulRemoteIC.onSenderBacklog(2);
            buffer.recycleBuffer();
            buffer = null;
            try {
                failingRemoteIC.checkError();
                Assert.fail((String)"The input channel should have an error based on the failure in RemoteInputChannel#notifyBufferAvailable()");
            }
            catch (IOException e) {
                MatcherAssert.assertThat((Object)e, (Matcher)org.hamcrest.Matchers.hasProperty((String)"cause", (Matcher)org.hamcrest.Matchers.isA(IllegalStateException.class)));
            }
            Assert.assertEquals((long)0L, (long)bufferPool.getNumberOfAvailableMemorySegments());
            buffer = successfulRemoteIC.requestBuffer();
            Assert.assertNull((String)"buffer should still remain in failingRemoteIC", (Object)buffer);
            failingRemoteIC.releaseAllResources();
            Assert.assertEquals((long)0L, (long)bufferPool.getNumberOfAvailableMemorySegments());
            buffer = successfulRemoteIC.requestBuffer();
            Assert.assertNotNull((String)"no buffer given to successfulRemoteIC", (Object)buffer);
        }
        catch (Throwable t) {
            try {
                thrown = t;
            }
            catch (Throwable throwable) {
                RemoteInputChannelTest.cleanup(networkBufferPool, null, buffer, thrown, new InputChannel[]{failingRemoteIC, successfulRemoteIC});
                throw throwable;
            }
            RemoteInputChannelTest.cleanup(networkBufferPool, null, buffer, thrown, new InputChannel[]{failingRemoteIC, successfulRemoteIC});
        }
        RemoteInputChannelTest.cleanup(networkBufferPool, null, buffer, thrown, new InputChannel[]{failingRemoteIC, successfulRemoteIC});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentOnSenderBacklogAndRelease() throws Exception {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(130, 32);
        int numFloatingBuffers = 128;
        ExecutorService executor = Executors.newFixedThreadPool(2);
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1, (MemorySegmentProvider)networkBufferPool);
        final RemoteInputChannel inputChannel = this.createRemoteInputChannel(inputGate);
        Throwable thrown = null;
        try {
            BufferPool bufferPool = networkBufferPool.createBufferPool(128, 128);
            inputGate.setBufferPool(bufferPool);
            inputGate.setupChannels();
            inputChannel.requestSubpartition();
            Callable<Void> requestBufferTask = new Callable<Void>(){

                @Override
                public Void call() throws Exception {
                    do {
                        for (int j = 1; j <= 128; ++j) {
                            inputChannel.onSenderBacklog(j);
                        }
                    } while (!inputChannel.isReleased());
                    return null;
                }
            };
            Callable<Void> releaseTask = new Callable<Void>(){

                @Override
                public Void call() throws Exception {
                    inputChannel.releaseAllResources();
                    return null;
                }
            };
            RemoteInputChannelTest.submitTasksAndWaitForResults(executor, new Callable[]{requestBufferTask, releaseTask});
            Assert.assertEquals((String)"There should be no buffers available in the channel.", (long)0L, (long)inputChannel.getNumberOfAvailableBuffers());
            Assert.assertEquals((String)"There should be 130 buffers available in local pool.", (long)130L, (long)(bufferPool.getNumberOfAvailableMemorySegments() + networkBufferPool.getNumberOfAvailableMemorySegments()));
        }
        catch (Throwable t) {
            try {
                thrown = t;
            }
            catch (Throwable throwable) {
                RemoteInputChannelTest.cleanup(networkBufferPool, executor, null, thrown, new InputChannel[]{inputChannel});
                throw throwable;
            }
            RemoteInputChannelTest.cleanup(networkBufferPool, executor, null, thrown, new InputChannel[]{inputChannel});
        }
        RemoteInputChannelTest.cleanup(networkBufferPool, executor, null, thrown, new InputChannel[]{inputChannel});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentOnSenderBacklogAndRecycle() throws Exception {
        int numExclusiveSegments = 120;
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(248, 32);
        int numFloatingBuffers = 128;
        int backlog = 128;
        ExecutorService executor = Executors.newFixedThreadPool(3);
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1, (MemorySegmentProvider)networkBufferPool);
        final RemoteInputChannel inputChannel = InputChannelTestUtils.createRemoteInputChannel(inputGate, 120);
        inputGate.setInputChannels(new InputChannel[]{inputChannel});
        Throwable thrown = null;
        try {
            BufferPool bufferPool = networkBufferPool.createBufferPool(128, 128);
            inputGate.setBufferPool(bufferPool);
            inputGate.setupChannels();
            inputChannel.requestSubpartition();
            Callable<Void> requestBufferTask = new Callable<Void>(){

                @Override
                public Void call() throws Exception {
                    for (int j = 1; j <= 128; ++j) {
                        inputChannel.onSenderBacklog(j);
                    }
                    return null;
                }
            };
            RemoteInputChannelTest.submitTasksAndWaitForResults(executor, new Callable[]{this.recycleBufferTask(inputChannel, bufferPool, 120, 128), requestBufferTask});
            Assert.assertEquals((String)("There should be " + inputChannel.getNumberOfRequiredBuffers() + " buffers available in channel."), (long)inputChannel.getNumberOfRequiredBuffers(), (long)inputChannel.getNumberOfAvailableBuffers());
            Assert.assertEquals((String)"There should be no buffers available in local pool.", (long)0L, (long)bufferPool.getNumberOfAvailableMemorySegments());
        }
        catch (Throwable t) {
            try {
                thrown = t;
            }
            catch (Throwable throwable) {
                RemoteInputChannelTest.cleanup(networkBufferPool, executor, null, thrown, new InputChannel[]{inputChannel});
                throw throwable;
            }
            RemoteInputChannelTest.cleanup(networkBufferPool, executor, null, thrown, new InputChannel[]{inputChannel});
        }
        RemoteInputChannelTest.cleanup(networkBufferPool, executor, null, thrown, new InputChannel[]{inputChannel});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentRecycleAndRelease() throws Exception {
        int numExclusiveSegments = 120;
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(248, 32);
        int numFloatingBuffers = 128;
        ExecutorService executor = Executors.newFixedThreadPool(3);
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1, (MemorySegmentProvider)networkBufferPool);
        final RemoteInputChannel inputChannel = InputChannelTestUtils.createRemoteInputChannel(inputGate, 120);
        inputGate.setInputChannels(new InputChannel[]{inputChannel});
        Throwable thrown = null;
        try {
            BufferPool bufferPool = networkBufferPool.createBufferPool(128, 128);
            inputGate.setBufferPool(bufferPool);
            inputGate.setupChannels();
            inputChannel.requestSubpartition();
            Callable<Void> releaseTask = new Callable<Void>(){

                @Override
                public Void call() throws Exception {
                    inputChannel.releaseAllResources();
                    return null;
                }
            };
            RemoteInputChannelTest.submitTasksAndWaitForResults(executor, new Callable[]{this.recycleBufferTask(inputChannel, bufferPool, 120, 128), releaseTask});
            Assert.assertEquals((String)"There should be no buffers available in the channel.", (long)0L, (long)inputChannel.getNumberOfAvailableBuffers());
            Assert.assertEquals((String)"There should be 128 buffers available in local pool.", (long)128L, (long)bufferPool.getNumberOfAvailableMemorySegments());
            Assert.assertEquals((String)"There should be 120 buffers available in global pool.", (long)120L, (long)networkBufferPool.getNumberOfAvailableMemorySegments());
        }
        catch (Throwable t) {
            try {
                thrown = t;
            }
            catch (Throwable throwable) {
                RemoteInputChannelTest.cleanup(networkBufferPool, executor, null, thrown, new InputChannel[]{inputChannel});
                throw throwable;
            }
            RemoteInputChannelTest.cleanup(networkBufferPool, executor, null, thrown, new InputChannel[]{inputChannel});
        }
        RemoteInputChannelTest.cleanup(networkBufferPool, executor, null, thrown, new InputChannel[]{inputChannel});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentRecycleAndRelease2() throws Exception {
        int retries = 1000;
        int numExclusiveBuffers = 2;
        int numFloatingBuffers = 2;
        int numTotalBuffers = 4;
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(4, 32);
        ExecutorService executor = Executors.newFixedThreadPool(2);
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1, (MemorySegmentProvider)networkBufferPool);
        RemoteInputChannel inputChannel = this.createRemoteInputChannel(inputGate);
        inputGate.setInputChannels(new InputChannel[]{inputChannel});
        Throwable thrown = null;
        try {
            BufferPool bufferPool = networkBufferPool.createBufferPool(2, 2);
            inputGate.setBufferPool(bufferPool);
            inputGate.setupChannels();
            inputChannel.requestSubpartition();
            Callable<Void> bufferPoolInteractionsTask = () -> {
                for (int i = 0; i < 1000; ++i) {
                    try (BufferBuilder bufferBuilder = bufferPool.requestBufferBuilderBlocking();){
                        Buffer buffer = BufferBuilderTestUtils.buildSingleBuffer(bufferBuilder);
                        buffer.recycleBuffer();
                        continue;
                    }
                }
                return null;
            };
            Callable<Void> channelInteractionsTask = () -> {
                ArrayList<Buffer> exclusiveBuffers = new ArrayList<Buffer>(2);
                ArrayList<Buffer> floatingBuffers = new ArrayList<Buffer>(2);
                try {
                    for (int i = 0; i < 1000; ++i) {
                        Buffer buffer;
                        for (int j = 0; j < 4 && (buffer = inputChannel.requestBuffer()) != null; ++j) {
                            if (buffer.getRecycler() == inputChannel.getBufferManager()) {
                                exclusiveBuffers.add(buffer);
                                continue;
                            }
                            floatingBuffers.add(buffer);
                        }
                        floatingBuffers.forEach(Buffer::recycleBuffer);
                        floatingBuffers.clear();
                        Assert.assertEquals((long)2L, (long)exclusiveBuffers.size());
                        inputChannel.onSenderBacklog(0);
                        exclusiveBuffers.forEach(Buffer::recycleBuffer);
                        exclusiveBuffers.clear();
                    }
                }
                finally {
                    inputChannel.releaseAllResources();
                }
                return null;
            };
            RemoteInputChannelTest.submitTasksAndWaitForResults(executor, new Callable[]{bufferPoolInteractionsTask, channelInteractionsTask});
        }
        catch (Throwable t) {
            try {
                thrown = t;
            }
            catch (Throwable throwable) {
                RemoteInputChannelTest.cleanup(networkBufferPool, executor, null, thrown, new InputChannel[]{inputChannel});
                throw throwable;
            }
            RemoteInputChannelTest.cleanup(networkBufferPool, executor, null, thrown, new InputChannel[]{inputChannel});
        }
        RemoteInputChannelTest.cleanup(networkBufferPool, executor, null, thrown, new InputChannel[]{inputChannel});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentGetNextBufferAndRelease() throws Exception {
        int numTotalBuffers = 1000;
        int numFloatingBuffers = 998;
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(1000, 32);
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1, (MemorySegmentProvider)networkBufferPool);
        RemoteInputChannel inputChannel = this.createRemoteInputChannel(inputGate);
        inputGate.setInputChannels(new InputChannel[]{inputChannel});
        ExecutorService executor = Executors.newFixedThreadPool(2);
        Throwable thrown = null;
        try {
            BufferPool bufferPool = networkBufferPool.createBufferPool(998, 998);
            inputGate.setBufferPool(bufferPool);
            inputGate.setupChannels();
            inputChannel.requestSubpartition();
            for (int i = 0; i < 1000; ++i) {
                Buffer buffer = inputChannel.requestBuffer();
                inputChannel.onBuffer(buffer, i, 0);
            }
            Callable<Void> getNextBufferTask = () -> {
                block3: {
                    try {
                        for (int i = 0; i < 1000; ++i) {
                            Optional bufferAndAvailability = inputChannel.getNextBuffer();
                            bufferAndAvailability.ifPresent(buffer -> buffer.buffer().recycleBuffer());
                        }
                    }
                    catch (Throwable t) {
                        if (inputChannel.isReleased()) break block3;
                        throw new AssertionError("Exceptions are expected here only if the input channel was released", t);
                    }
                }
                return null;
            };
            Callable<Void> releaseTask = () -> {
                inputChannel.releaseAllResources();
                return null;
            };
            RemoteInputChannelTest.submitTasksAndWaitForResults(executor, new Callable[]{getNextBufferTask, releaseTask});
        }
        catch (Throwable t) {
            try {
                thrown = t;
            }
            catch (Throwable throwable) {
                RemoteInputChannelTest.cleanup(networkBufferPool, executor, null, thrown, new InputChannel[]{inputChannel});
                throw throwable;
            }
            RemoteInputChannelTest.cleanup(networkBufferPool, executor, null, thrown, new InputChannel[]{inputChannel});
        }
        RemoteInputChannelTest.cleanup(networkBufferPool, executor, null, thrown, new InputChannel[]{inputChannel});
    }

    @Test
    public void testPartitionNotFoundExceptionWhileRetriggeringRequest() throws Exception {
        RemoteInputChannel inputChannel = InputChannelTestUtils.createRemoteInputChannel(InputChannelTestUtils.createSingleInputGate(1), 0, new TestingConnectionManager());
        inputChannel.requestSubpartition();
        inputChannel.retriggerSubpartitionRequest();
        try {
            inputChannel.checkError();
            Assert.fail((String)"Should throw a PartitionNotFoundException.");
        }
        catch (PartitionNotFoundException notFound) {
            MatcherAssert.assertThat((Object)inputChannel.getPartitionId(), (Matcher)org.hamcrest.Matchers.is((Object)notFound.getPartitionId()));
        }
    }

    @Test
    public void testPartitionConnectionExceptionWhileRequestingPartition() throws Exception {
        RemoteInputChannel inputChannel = InputChannelTestUtils.createRemoteInputChannel(InputChannelTestUtils.createSingleInputGate(1), 0, new TestingExceptionConnectionManager());
        try {
            inputChannel.requestSubpartition();
            Assert.fail((String)"Expected PartitionConnectionException.");
        }
        catch (PartitionConnectionException ex) {
            MatcherAssert.assertThat((Object)inputChannel.getPartitionId(), (Matcher)org.hamcrest.Matchers.is((Object)ex.getPartitionId()));
        }
    }

    @Test(expected=IllegalStateException.class)
    public void testUnblockReleasedChannel() throws Exception {
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1);
        RemoteInputChannel remoteChannel = this.createRemoteInputChannel(inputGate);
        remoteChannel.releaseAllResources();
        remoteChannel.resumeConsumption();
    }

    @Test(expected=IllegalStateException.class)
    public void testReleasedChannelAnnounceBufferSize() throws Exception {
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1);
        RemoteInputChannel remoteChannel = this.createRemoteInputChannel(inputGate);
        remoteChannel.releaseAllResources();
        remoteChannel.announceBufferSize(10);
    }

    @Test
    public void testOnUpstreamBlockedAndResumed() throws Exception {
        TestBufferPool bufferPool = new TestBufferPool();
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(bufferPool);
        RemoteInputChannel remoteChannel1 = this.createRemoteInputChannel(inputGate, 0, 2);
        RemoteInputChannel remoteChannel2 = this.createRemoteInputChannel(inputGate, 1, 0);
        inputGate.setup();
        remoteChannel1.requestSubpartition();
        remoteChannel2.requestSubpartition();
        remoteChannel1.onSenderBacklog(2);
        remoteChannel2.onSenderBacklog(2);
        Assert.assertEquals((long)4L, (long)remoteChannel1.getNumberOfAvailableBuffers());
        Assert.assertEquals((long)2L, (long)remoteChannel2.getNumberOfAvailableBuffers());
        Buffer barrier = EventSerializer.toBuffer((AbstractEvent)new CheckpointBarrier(1L, 123L, CheckpointOptions.alignedWithTimeout((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault(), (long)Integer.MAX_VALUE)), (boolean)false);
        remoteChannel1.onBuffer(barrier, 0, 0);
        remoteChannel2.onBuffer(barrier, 0, 0);
        Assert.assertEquals((long)4L, (long)remoteChannel1.getNumberOfAvailableBuffers());
        Assert.assertEquals((long)0L, (long)remoteChannel2.getNumberOfAvailableBuffers());
        remoteChannel1.resumeConsumption();
        remoteChannel2.resumeConsumption();
        Assert.assertEquals((long)4L, (long)remoteChannel1.getUnannouncedCredit());
        Assert.assertEquals((long)0L, (long)remoteChannel2.getUnannouncedCredit());
        remoteChannel1.onSenderBacklog(4);
        remoteChannel2.onSenderBacklog(4);
        Assert.assertEquals((long)6L, (long)remoteChannel1.getNumberOfAvailableBuffers());
        Assert.assertEquals((long)4L, (long)remoteChannel2.getNumberOfAvailableBuffers());
        Assert.assertEquals((long)6L, (long)remoteChannel1.getUnannouncedCredit());
        Assert.assertEquals((long)4L, (long)remoteChannel2.getUnannouncedCredit());
    }

    @Test
    public void testRequestBuffer() throws Exception {
        int i;
        TestBufferPool bufferPool = new TestBufferPool();
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(bufferPool);
        RemoteInputChannel remoteChannel1 = this.createRemoteInputChannel(inputGate, 0, 2);
        RemoteInputChannel remoteChannel2 = this.createRemoteInputChannel(inputGate, 1, 0);
        inputGate.setup();
        remoteChannel1.requestSubpartition();
        remoteChannel2.requestSubpartition();
        remoteChannel1.onSenderBacklog(2);
        remoteChannel2.onSenderBacklog(2);
        for (i = 4; i >= 0; --i) {
            Assert.assertEquals((long)i, (long)remoteChannel1.getNumberOfRequiredBuffers());
            remoteChannel1.requestBuffer();
        }
        for (i = 2; i >= 0; --i) {
            Assert.assertEquals((long)i, (long)remoteChannel2.getNumberOfRequiredBuffers());
            remoteChannel2.requestBuffer();
        }
    }

    @Test
    public void testPrioritySequenceNumbers() throws Exception {
        int sequenceNumber = 0;
        int bufferSize = 1;
        RemoteInputChannel channel = this.buildInputGateAndGetChannel(sequenceNumber);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.sendBarrier(channel, sequenceNumber++, UNALIGNED);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.assertGetNextBufferSequenceNumbers(channel, 2, 0, 1, 3, 4);
    }

    @Test
    public void testGetInflightBuffers() throws Exception {
        int bufferSize = 1;
        int sequenceNumber = 0;
        RemoteInputChannel channel = this.buildInputGateAndGetChannel(sequenceNumber);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.sendBarrier(channel, sequenceNumber++, UNALIGNED);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.assertInflightBufferSizes(channel, 1, 2);
    }

    @Test
    public void testGetAllInflightBuffers() throws Exception {
        int sequenceNumber = 0x7FFFFFFD;
        int bufferSize = 1;
        RemoteInputChannel channel = this.buildInputGateAndGetChannel(sequenceNumber);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.assertInflightBufferSizes(channel, 1, 2, 3, 4);
    }

    @Test
    public void testGetInflightBuffersOverflow() throws Exception {
        for (int startingSequence = 0x7FFFFFF5; startingSequence != -2147483646; ++startingSequence) {
            RemoteInputChannel channel = this.buildInputGateAndGetChannel(startingSequence);
            int bufferSize = 1;
            int sequenceNumber = startingSequence;
            this.sendBuffer(channel, sequenceNumber++, bufferSize++);
            this.sendBuffer(channel, sequenceNumber++, bufferSize++);
            this.sendBarrier(channel, sequenceNumber++, UNALIGNED);
            this.sendBuffer(channel, sequenceNumber++, bufferSize++);
            this.sendBuffer(channel, sequenceNumber++, bufferSize++);
            MatcherAssert.assertThat((String)("For starting sequence " + startingSequence), RemoteInputChannelTest.toBufferSizes(channel.getInflightBuffers(1L)), (Matcher)org.hamcrest.Matchers.contains((Object[])new Integer[]{1, 2}));
        }
    }

    @Test
    public void testGetInflightBuffersAfterPollingBuffer() throws Exception {
        int bufferSize = 1;
        int sequenceNumber = 0;
        RemoteInputChannel channel = this.buildInputGateAndGetChannel(sequenceNumber);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.sendBarrier(channel, sequenceNumber++, UNALIGNED);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.assertGetNextBufferSequenceNumbers(channel, 2, 0);
        this.assertInflightBufferSizes(channel, 2);
    }

    private static List<Integer> toBufferSizes(List<Buffer> inflightBuffers) {
        return inflightBuffers.stream().map(buffer -> buffer.getSize()).collect(Collectors.toList());
    }

    @Test
    public void testRequiresAnnouncement() throws Exception {
        int sequenceNumber = 0;
        int bufferSize = 1;
        RemoteInputChannel channel = this.buildInputGateAndGetChannel(sequenceNumber);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.sendBarrier(channel, sequenceNumber++, ALIGNED_WITH_TIMEOUT);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        InputChannel.BufferAndAvailability nextBuffer = (InputChannel.BufferAndAvailability)channel.getNextBuffer().get();
        Assert.assertEquals((long)2L, (long)nextBuffer.getSequenceNumber());
        Assert.assertFalse((boolean)nextBuffer.morePriorityEvents());
        Assert.assertTrue((boolean)nextBuffer.moreAvailable());
        Assert.assertEquals((Object)Buffer.DataType.PRIORITIZED_EVENT_BUFFER, (Object)nextBuffer.buffer().getDataType());
        this.assertGetNextBufferSequenceNumbers(channel, 0, 1);
        nextBuffer = (InputChannel.BufferAndAvailability)channel.getNextBuffer().get();
        Assert.assertEquals((long)2L, (long)nextBuffer.getSequenceNumber());
        Assert.assertEquals((Object)Buffer.DataType.TIMEOUTABLE_ALIGNED_CHECKPOINT_BARRIER, (Object)nextBuffer.buffer().getDataType());
        Assert.assertEquals((long)3L, (long)((InputChannel.BufferAndAvailability)channel.getNextBuffer().get()).getSequenceNumber());
    }

    @Test
    public void testGetInflightBuffersBeforeProcessingAnnouncement() throws Exception {
        int bufferSize = 1;
        int sequenceNumber = 0;
        RemoteInputChannel channel = this.buildInputGateAndGetChannel(sequenceNumber);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.sendBarrier(channel, sequenceNumber++, ALIGNED_WITH_TIMEOUT);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.assertInflightBufferSizes(channel, 1, 2);
    }

    @Test
    public void testGetInflightBuffersAfterProcessingAnnouncement() throws Exception {
        int bufferSize = 1;
        int sequenceNumber = 0;
        RemoteInputChannel channel = this.buildInputGateAndGetChannel(sequenceNumber);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.sendBarrier(channel, sequenceNumber++, ALIGNED_WITH_TIMEOUT);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.assertGetNextBufferSequenceNumbers(channel, 2);
        this.assertInflightBufferSizes(channel, 1, 2);
    }

    @Test
    public void testGetInflightBuffersAfterProcessingAnnouncementAndBuffer() throws Exception {
        int bufferSize = 1;
        int sequenceNumber = 0;
        RemoteInputChannel channel = this.buildInputGateAndGetChannel(sequenceNumber);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.sendBarrier(channel, sequenceNumber++, ALIGNED_WITH_TIMEOUT);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.assertGetNextBufferSequenceNumbers(channel, 2, 0);
        this.assertInflightBufferSizes(channel, 2);
    }

    @Test
    public void testSizeOfQueuedBuffers() throws Exception {
        int i;
        int sequenceNumber = 0;
        int bufferSize = 1;
        int queueSize = 0;
        RemoteInputChannel channel = this.buildInputGateAndGetChannel(sequenceNumber);
        Assert.assertEquals((long)0L, (long)channel.unsynchronizedGetSizeOfQueuedBuffers());
        for (i = 0; i < 2; ++i) {
            this.sendBuffer(channel, sequenceNumber++, bufferSize++);
            Assert.assertEquals((long)(queueSize += bufferSize), (long)channel.unsynchronizedGetSizeOfQueuedBuffers());
        }
        this.sendBarrier(channel, sequenceNumber++, UNALIGNED);
        Assert.assertEquals((long)(queueSize += EventSerializer.toSerializedEvent((AbstractEvent)new CheckpointBarrier(1L, 123L, UNALIGNED)).remaining()), (long)channel.unsynchronizedGetSizeOfQueuedBuffers());
        for (i = 0; i < 3; ++i) {
            Optional nextBuffer = channel.getNextBuffer();
            Assert.assertEquals((long)(queueSize -= ((InputChannel.BufferAndAvailability)nextBuffer.get()).buffer().getSize()), (long)channel.unsynchronizedGetSizeOfQueuedBuffers());
        }
        Assert.assertEquals((long)0L, (long)channel.unsynchronizedGetSizeOfQueuedBuffers());
    }

    private void sendBarrier(RemoteInputChannel channel, int sequenceNumber, CheckpointOptions checkpointOptions) throws IOException {
        this.send(channel, sequenceNumber, EventSerializer.toBuffer((AbstractEvent)new CheckpointBarrier(1L, 123L, checkpointOptions), (boolean)checkpointOptions.isUnalignedCheckpoint()));
    }

    private void sendBuffer(RemoteInputChannel channel, int sequenceNumber, int dataSize) throws IOException {
        this.send(channel, sequenceNumber, TestBufferFactory.createBuffer(dataSize));
    }

    private void send(RemoteInputChannel channel, int sequenceNumber, Buffer buffer) throws IOException {
        channel.onBuffer(buffer, sequenceNumber, 0);
        channel.checkError();
    }

    private void assertInflightBufferSizes(RemoteInputChannel channel, Integer ... bufferSizes) throws CheckpointException {
        Assert.assertEquals(Arrays.asList(bufferSizes), RemoteInputChannelTest.toBufferSizes(channel.getInflightBuffers(1L)));
    }

    private void assertGetNextBufferSequenceNumbers(RemoteInputChannel channel, Integer ... sequenceNumbers) throws IOException {
        ArrayList actualSequenceNumbers = new ArrayList();
        for (int i = 0; i < sequenceNumbers.length; ++i) {
            channel.getNextBuffer().map(InputChannel.BufferAndAvailability::getSequenceNumber).ifPresent(actualSequenceNumbers::add);
        }
        MatcherAssert.assertThat(actualSequenceNumbers, (Matcher)org.hamcrest.Matchers.contains((Object[])sequenceNumbers));
    }

    private RemoteInputChannel createRemoteInputChannel(SingleInputGate inputGate) {
        return this.createRemoteInputChannel(inputGate, 0, 0, 0);
    }

    private RemoteInputChannel createRemoteInputChannel(SingleInputGate inputGate, int consumedSubpartitionIndex, int initialCredits) {
        return InputChannelBuilder.newBuilder().setConsumedSubpartitionIndex(consumedSubpartitionIndex).setNetworkBuffersPerChannel(initialCredits).buildRemoteChannel(inputGate);
    }

    private RemoteInputChannel createRemoteInputChannel(SingleInputGate inputGate, int consumedSubpartitionIndex, int initialBackoff, int maxBackoff) {
        return InputChannelBuilder.newBuilder().setConsumedSubpartitionIndex(consumedSubpartitionIndex).setInitialBackoff(initialBackoff).setMaxBackoff(maxBackoff).buildRemoteChannel(inputGate);
    }

    private RemoteInputChannel createRemoteInputChannel(SingleInputGate inputGate, ConnectionManager connectionManager, ResultPartitionID partitionId, int initialBackoff, int maxBackoff) {
        return InputChannelBuilder.newBuilder().setInitialBackoff(initialBackoff).setMaxBackoff(maxBackoff).setPartitionId(partitionId).setConnectionManager(connectionManager).buildRemoteChannel(inputGate);
    }

    private RemoteInputChannel buildInputGateAndGetChannel(int expectedSequenceNumber) throws IOException {
        RemoteInputChannel channel = this.buildInputGateAndGetChannel();
        channel.setExpectedSequenceNumber(expectedSequenceNumber);
        return channel;
    }

    private RemoteInputChannel buildInputGateAndGetChannel() throws IOException {
        return (RemoteInputChannel)this.buildInputGate().getChannel(0);
    }

    private SingleInputGate buildInputGate() throws IOException {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(4, 4096);
        SingleInputGate inputGate = new SingleInputGateBuilder().setChannelFactory(InputChannelBuilder::buildRemoteChannel).setBufferPoolFactory(networkBufferPool.createBufferPool(1, 4)).setSegmentProvider((MemorySegmentProvider)networkBufferPool).build();
        inputGate.setup();
        inputGate.requestPartitions();
        return inputGate;
    }

    @Test
    public void testOnFailedPartitionRequestDoesNotBlockNetworkThreads() throws Exception {
        long testBlockedWaitTimeoutMillis = 30000L;
        PartitionProducerStateChecker partitionProducerStateChecker = (jobId, intermediateDataSetId, resultPartitionId) -> CompletableFuture.completedFuture(ExecutionState.RUNNING);
        NettyShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build();
        Task task = new TestTaskBuilder((ShuffleEnvironment<?, ?>)shuffleEnvironment).setPartitionProducerStateChecker(partitionProducerStateChecker).build();
        SingleInputGate inputGate = new SingleInputGateBuilder().setPartitionProducerStateProvider((PartitionProducerStateProvider)task).build();
        TestTaskBuilder.setTaskState(task, ExecutionState.RUNNING);
        final OneShotLatch ready = new OneShotLatch();
        final OneShotLatch blocker = new OneShotLatch();
        final AtomicBoolean timedOutOrInterrupted = new AtomicBoolean(false);
        TestingConnectionManager blockingConnectionManager = new TestingConnectionManager(){

            @Override
            public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) {
                ready.trigger();
                try {
                    blocker.await(30000L, TimeUnit.MILLISECONDS);
                }
                catch (InterruptedException | TimeoutException e) {
                    timedOutOrInterrupted.set(true);
                }
                return new TestingPartitionRequestClient();
            }
        };
        RemoteInputChannel remoteInputChannel = InputChannelBuilder.newBuilder().setConnectionManager(blockingConnectionManager).buildRemoteChannel(inputGate);
        inputGate.setInputChannels(new InputChannel[]{remoteInputChannel});
        Thread simulatedNetworkThread = new Thread(() -> {
            try {
                ready.await();
                remoteInputChannel.onFailedPartitionRequest();
                blocker.trigger();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        simulatedNetworkThread.start();
        inputGate.requestPartitions();
        simulatedNetworkThread.join();
        Assert.assertFalse((String)"Test ended by timeout or interruption - this indicates that the network thread was blocked.", (boolean)timedOutOrInterrupted.get());
    }

    @Test
    public void testNotifyOnPriority() throws IOException {
        SingleInputGate inputGate = new SingleInputGateBuilder().build();
        RemoteInputChannel channel = InputChannelTestUtils.createRemoteInputChannel(inputGate, 0);
        CheckpointOptions options = new CheckpointOptions((SnapshotType)CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault());
        AvailabilityUtil.assertPriorityAvailability((InputGate)inputGate, false, false, () -> AvailabilityUtil.assertAvailability((AvailabilityProvider)inputGate, false, true, () -> channel.onBuffer(EventSerializer.toBuffer((AbstractEvent)new CheckpointBarrier(1L, 123L, options), (boolean)false), 0, 0)));
        AvailabilityUtil.assertPriorityAvailability((InputGate)inputGate, false, true, () -> AvailabilityUtil.assertAvailability((AvailabilityProvider)inputGate, true, true, () -> channel.onBuffer(EventSerializer.toBuffer((AbstractEvent)new CheckpointBarrier(2L, 123L, options), (boolean)true), 1, 0)));
    }

    @Test
    public void testBuffersInUseCount() throws Exception {
        RemoteInputChannel remoteInputChannel = this.buildInputGateAndGetChannel();
        Buffer buffer = TestBufferFactory.createBuffer(32768);
        remoteInputChannel.onBuffer(buffer.retainBuffer(), 0, 1);
        Assert.assertEquals((long)2L, (long)remoteInputChannel.getBuffersInUseCount());
        remoteInputChannel.onBuffer(buffer.retainBuffer(), 1, 3);
        Assert.assertEquals((long)5L, (long)remoteInputChannel.getBuffersInUseCount());
        remoteInputChannel.getNextBuffer();
        Assert.assertEquals((long)4L, (long)remoteInputChannel.getBuffersInUseCount());
        remoteInputChannel.getNextBuffer();
        Assert.assertEquals((long)3L, (long)remoteInputChannel.getBuffersInUseCount());
        remoteInputChannel.getNextBuffer();
        Assert.assertEquals((long)3L, (long)remoteInputChannel.getBuffersInUseCount());
    }

    private Callable<Void> recycleBufferTask(RemoteInputChannel inputChannel, BufferPool bufferPool, int numExclusiveSegments, int numFloatingBuffers) throws Exception {
        final ArrayDeque<Buffer> exclusiveBuffers = new ArrayDeque<Buffer>(numExclusiveSegments);
        for (int i = 0; i < numExclusiveSegments; ++i) {
            Buffer buffer = inputChannel.requestBuffer();
            Assert.assertNotNull((Object)buffer);
            exclusiveBuffers.add(buffer);
        }
        final ArrayDeque<Buffer> floatingBuffers = new ArrayDeque<Buffer>(numFloatingBuffers);
        for (int i = 0; i < numFloatingBuffers; ++i) {
            Buffer buffer = bufferPool.requestBuffer();
            Assert.assertNotNull((Object)buffer);
            floatingBuffers.add(buffer);
        }
        return new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                Random random = new Random();
                while (!exclusiveBuffers.isEmpty() && !floatingBuffers.isEmpty()) {
                    if (random.nextBoolean()) {
                        ((Buffer)exclusiveBuffers.poll()).recycleBuffer();
                        continue;
                    }
                    ((Buffer)floatingBuffers.poll()).recycleBuffer();
                }
                while (!exclusiveBuffers.isEmpty()) {
                    ((Buffer)exclusiveBuffers.poll()).recycleBuffer();
                }
                while (!floatingBuffers.isEmpty()) {
                    ((Buffer)floatingBuffers.poll()).recycleBuffer();
                }
                return null;
            }
        };
    }

    static void submitTasksAndWaitForResults(ExecutorService executor, Callable[] tasks) throws Exception {
        ArrayList results = Lists.newArrayListWithCapacity((int)tasks.length);
        for (Callable task : tasks) {
            results.add(executor.submit(task));
        }
        for (Future result : results) {
            result.get();
        }
    }

    public static void cleanup(NetworkBufferPool networkBufferPool, @Nullable ExecutorService executor, @Nullable Buffer buffer, @Nullable Throwable throwable, InputChannel ... inputChannels) throws Exception {
        for (InputChannel inputChannel : inputChannels) {
            try {
                inputChannel.releaseAllResources();
            }
            catch (Throwable tInner) {
                throwable = ExceptionUtils.firstOrSuppressed((Throwable)tInner, (Throwable)throwable);
            }
        }
        if (buffer != null && !buffer.isRecycled()) {
            buffer.recycleBuffer();
        }
        try {
            networkBufferPool.destroyAllBufferPools();
        }
        catch (Throwable tInner) {
            throwable = ExceptionUtils.firstOrSuppressed((Throwable)tInner, (Throwable)throwable);
        }
        try {
            networkBufferPool.destroy();
        }
        catch (Throwable tInner) {
            throwable = ExceptionUtils.firstOrSuppressed((Throwable)tInner, (Throwable)throwable);
        }
        if (executor != null) {
            executor.shutdown();
        }
        if (throwable != null) {
            ExceptionUtils.rethrowException((Throwable)throwable);
        }
    }

    private static final class TestBufferPool
    extends NoOpBufferPool {
        private TestBufferPool() {
        }

        @Override
        public Buffer requestBuffer() {
            MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment((int)1024);
            return new NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE);
        }
    }

    private static final class TestVerifyPartitionRequestClient
    extends TestingPartitionRequestClient {
        private ResultPartitionID partitionId;
        private int subpartitionIndex;
        private int delayMs;

        private TestVerifyPartitionRequestClient() {
        }

        @Override
        public void requestSubpartition(ResultPartitionID partitionId, int subpartitionIndex, RemoteInputChannel channel, int delayMs) {
            this.partitionId = partitionId;
            this.subpartitionIndex = subpartitionIndex;
            this.delayMs = delayMs;
        }

        void verifyResult(ResultPartitionID expectedId, int expectedSubpartitionIndex, int expectedDelayMs) {
            Assert.assertEquals((Object)expectedId, (Object)this.partitionId);
            Assert.assertEquals((long)expectedSubpartitionIndex, (long)this.subpartitionIndex);
            Assert.assertEquals((long)expectedDelayMs, (long)this.delayMs);
        }
    }

    private static final class TestVerifyConnectionManager
    extends TestingConnectionManager {
        private final PartitionRequestClient client;

        TestVerifyConnectionManager(TestingPartitionRequestClient client) {
            this.client = (PartitionRequestClient)Preconditions.checkNotNull((Object)client);
        }

        @Override
        public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) {
            return this.client;
        }
    }

    private static final class TestPartitionProducerStateProvider
    implements PartitionProducerStateProvider {
        private boolean isInvoked;
        private final ResultPartitionID partitionId;

        TestPartitionProducerStateProvider(ResultPartitionID partitionId) {
            this.partitionId = (ResultPartitionID)Preconditions.checkNotNull((Object)partitionId);
        }

        public void requestPartitionProducerState(IntermediateDataSetID intermediateDataSetId, ResultPartitionID resultPartitionId, Consumer<? super PartitionProducerStateProvider.ResponseHandle> responseConsumer) {
            Assert.assertEquals((Object)this.partitionId, (Object)resultPartitionId);
            this.isInvoked = true;
        }

        boolean isInvoked() {
            return this.isInvoked;
        }
    }

    private static final class TestingExceptionConnectionManager
    extends TestingConnectionManager {
        private TestingExceptionConnectionManager() {
        }

        @Override
        public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) throws IOException {
            throw new IOException("");
        }
    }

    private static interface TriFunction<T, U, V, R> {
        public R apply(T var1, U var2, V var3) throws Exception;
    }
}

