/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.TaskEventPublisher;
import org.apache.flink.runtime.io.network.TestingConnectionManager;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderAndConsumerTest;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
import org.apache.flink.runtime.io.network.buffer.BufferReceivedListener;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
import org.apache.flink.runtime.io.network.partition.InputGateFairnessTest;
import org.apache.flink.runtime.io.network.partition.NoOpResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionTest;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputGateID;
import org.apache.flink.runtime.io.network.partition.consumer.InputGateTestBase;
import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.LocalRecoveredInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannelTest;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteRecoveredInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory;
import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.UnknownInputChannel;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.runtime.io.network.util.TestTaskEvent;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.ExceptionUtils;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

public class SingleInputGateTest
extends InputGateTestBase {
    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSetupLogic() throws Exception {
        NettyShuffleEnvironment environment = this.createNettyShuffleEnvironment();
        SingleInputGate inputGate = this.createInputGate(environment);
        try {
            Assert.assertNull((Object)inputGate.getBufferPool());
            for (InputChannel inputChannel : inputGate.getInputChannels().values()) {
                Assert.assertTrue((inputChannel instanceof RecoveredInputChannel || inputChannel instanceof UnknownInputChannel ? 1 : 0) != 0);
                if (!(inputChannel instanceof RecoveredInputChannel)) continue;
                Assert.assertEquals((long)0L, (long)((RecoveredInputChannel)inputChannel).bufferManager.getNumberOfAvailableBuffers());
            }
            inputGate.setup();
            Assert.assertNotNull((Object)inputGate.getBufferPool());
            Assert.assertEquals((long)1L, (long)inputGate.getBufferPool().getNumberOfRequiredMemorySegments());
            for (InputChannel inputChannel : inputGate.getInputChannels().values()) {
                if (inputChannel instanceof RemoteRecoveredInputChannel) {
                    Assert.assertEquals((long)2L, (long)((RemoteRecoveredInputChannel)inputChannel).bufferManager.getNumberOfAvailableBuffers());
                    continue;
                }
                if (!(inputChannel instanceof LocalRecoveredInputChannel)) continue;
                Assert.assertEquals((long)0L, (long)((LocalRecoveredInputChannel)inputChannel).bufferManager.getNumberOfAvailableBuffers());
            }
        }
        finally {
            inputGate.close();
            environment.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReadRecoveredState() throws Exception {
        int totalStates = 5;
        int[] states = new int[]{1, 2, 3, 4};
        ResultPartitionTest.FiniteChannelStateReader stateReader = new ResultPartitionTest.FiniteChannelStateReader(5, states);
        int totalBuffers = 3;
        NettyShuffleEnvironment environment = new NettyShuffleEnvironmentBuilder().setBufferSize(states.length * 4).setNumNetworkBuffers(3).build();
        SingleInputGate inputGate = this.createInputGate(environment);
        ExecutorService executor = Executors.newFixedThreadPool(1);
        try {
            inputGate.setup();
            CompletableFuture future = inputGate.readRecoveredState(executor, (ChannelStateReader)stateReader);
            int numConsumedBuffers = 0;
            while (!future.isDone() || numConsumedBuffers != 10) {
                if (this.getNextBufferAndVerify(inputGate, states)) {
                    ++numConsumedBuffers;
                    continue;
                }
                Thread.sleep(3L);
            }
            inputGate.close();
            Assert.assertEquals((long)3L, (long)environment.getNetworkBufferPool().getNumberOfAvailableMemorySegments());
        }
        finally {
            executor.shutdown();
            environment.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentReadStateAndProcessAndClose() throws Exception {
        int totalStates = 5;
        int[] states = new int[]{1, 2, 3, 4};
        ResultPartitionTest.FiniteChannelStateReader stateReader = new ResultPartitionTest.FiniteChannelStateReader(5, states);
        int totalBuffers = 3;
        NettyShuffleEnvironment environment = new NettyShuffleEnvironmentBuilder().setBufferSize(states.length * 4).setNumNetworkBuffers(3).build();
        SingleInputGate inputGate = this.createInputGate(environment);
        ExecutorService executor = Executors.newFixedThreadPool(3);
        try {
            inputGate.setup();
            Callable<Void> closeTask = () -> {
                inputGate.close();
                return null;
            };
            Callable<Void> readRecoveredStateTask = () -> {
                inputGate.readRecoveredState(executor, stateReader);
                return null;
            };
            Callable<Void> processStateTask = () -> {
                try {
                    while (true) {
                        if (this.getNextBufferAndVerify(inputGate, states)) {
                            continue;
                        }
                        Thread.sleep(1L);
                    }
                }
                catch (Throwable t) {
                    return null;
                }
            };
            RemoteInputChannelTest.submitTasksAndWaitForResults(executor, new Callable[]{closeTask, readRecoveredStateTask, processStateTask});
        }
        finally {
            executor.shutdown();
            executor.awaitTermination(60L, TimeUnit.SECONDS);
            Assert.assertEquals((long)3L, (long)environment.getNetworkBufferPool().getNumberOfAvailableMemorySegments());
            Assert.assertTrue((boolean)inputGate.getCloseFuture().isDone());
            environment.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPartitionRequestLogic() throws Exception {
        NettyShuffleEnvironment environment = new NettyShuffleEnvironmentBuilder().build();
        SingleInputGate gate = this.createInputGate(environment);
        try {
            gate.requestPartitions();
            gate.pollNext();
            Collection channels = gate.getInputChannels().values();
            for (InputChannel channel : channels) {
                if (channel.getChannelIndex() == 0) {
                    MatcherAssert.assertThat((Object)channel, (Matcher)Matchers.instanceOf(RemoteInputChannel.class));
                    Assert.assertNotNull((Object)((RemoteInputChannel)channel).getPartitionRequestClient());
                    Assert.assertEquals((long)2L, (long)((RemoteInputChannel)channel).getInitialCredit());
                    continue;
                }
                if (channel.getChannelIndex() == 1) {
                    MatcherAssert.assertThat((Object)channel, (Matcher)Matchers.instanceOf(LocalInputChannel.class));
                    Assert.assertNotNull((Object)((LocalInputChannel)channel).getSubpartitionView());
                    continue;
                }
                if (channel.getChannelIndex() != 2) continue;
                MatcherAssert.assertThat((Object)channel, (Matcher)Matchers.instanceOf(UnknownInputChannel.class));
            }
        }
        finally {
            gate.close();
            environment.close();
        }
    }

    @Test
    public void testBasicGetNextLogic() throws Exception {
        SingleInputGate inputGate = this.createInputGate();
        InputChannel[] inputChannels = new TestInputChannel[]{new TestInputChannel(inputGate, 0), new TestInputChannel(inputGate, 1)};
        inputGate.setInputChannels(inputChannels);
        inputChannels[0].readBuffer();
        inputChannels[0].readBuffer();
        inputChannels[1].readBuffer();
        inputChannels[1].readEndOfPartitionEvent();
        inputChannels[0].readEndOfPartitionEvent();
        inputGate.notifyChannelNonEmpty(inputChannels[0]);
        inputGate.notifyChannelNonEmpty(inputChannels[1]);
        SingleInputGateTest.verifyBufferOrEvent((InputGate)inputGate, true, 0, true);
        SingleInputGateTest.verifyBufferOrEvent((InputGate)inputGate, true, 1, true);
        SingleInputGateTest.verifyBufferOrEvent((InputGate)inputGate, true, 0, true);
        SingleInputGateTest.verifyBufferOrEvent((InputGate)inputGate, false, 1, true);
        SingleInputGateTest.verifyBufferOrEvent((InputGate)inputGate, false, 0, false);
        Assert.assertTrue((boolean)inputGate.isFinished());
        for (InputChannel ic : inputChannels) {
            ic.assertReturnedEventsAreRecycled();
        }
    }

    @Test
    public void testGetCompressedBuffer() throws Exception {
        int bufferSize = 1024;
        String compressionCodec = "LZ4";
        BufferCompressor compressor = new BufferCompressor(bufferSize, compressionCodec);
        BufferDecompressor decompressor = new BufferDecompressor(bufferSize, compressionCodec);
        try (SingleInputGate inputGate = new SingleInputGateBuilder().setBufferDecompressor(decompressor).build();){
            TestInputChannel inputChannel = new TestInputChannel(inputGate, 0);
            MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment((int)bufferSize);
            for (int i = 0; i < bufferSize; i += 8) {
                segment.putLongLittleEndian(i, (long)i);
            }
            NetworkBuffer uncompressedBuffer = new NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE);
            uncompressedBuffer.setSize(bufferSize);
            Buffer compressedBuffer = compressor.compressToOriginalBuffer((Buffer)uncompressedBuffer);
            Assert.assertTrue((boolean)compressedBuffer.isCompressed());
            inputChannel.read(compressedBuffer);
            inputGate.setInputChannels(new InputChannel[]{inputChannel});
            inputGate.notifyChannelNonEmpty((InputChannel)inputChannel);
            Optional bufferOrEvent = inputGate.getNext();
            Assert.assertTrue((boolean)bufferOrEvent.isPresent());
            Assert.assertTrue((boolean)((BufferOrEvent)bufferOrEvent.get()).isBuffer());
            ByteBuffer buffer = ((BufferOrEvent)bufferOrEvent.get()).getBuffer().getNioBufferReadable().order(ByteOrder.LITTLE_ENDIAN);
            for (int i = 0; i < bufferSize; i += 8) {
                Assert.assertEquals((long)i, (long)buffer.getLong());
            }
        }
    }

    @Test
    public void testIsAvailable() throws Exception {
        SingleInputGate inputGate = this.createInputGate(1);
        TestInputChannel inputChannel = new TestInputChannel(inputGate, 0);
        inputGate.setInputChannels(new InputChannel[]{inputChannel});
        this.testIsAvailable((InputGate)inputGate, inputGate, inputChannel);
    }

    @Test
    public void testIsAvailableAfterFinished() throws Exception {
        SingleInputGate inputGate = this.createInputGate(1);
        TestInputChannel inputChannel = new TestInputChannel(inputGate, 0);
        inputGate.setInputChannels(new InputChannel[]{inputChannel});
        this.testIsAvailableAfterFinished((InputGate)inputGate, () -> {
            inputChannel.readEndOfPartitionEvent();
            inputGate.notifyChannelNonEmpty((InputChannel)inputChannel);
        });
    }

    @Test
    public void testIsMoreAvailableReadingFromSingleInputChannel() throws Exception {
        SingleInputGate inputGate = this.createInputGate();
        InputChannel[] inputChannels = new TestInputChannel[]{new TestInputChannel(inputGate, 0), new TestInputChannel(inputGate, 1)};
        inputGate.setInputChannels(inputChannels);
        inputChannels[0].readBuffer();
        inputChannels[0].readBuffer(false);
        inputGate.notifyChannelNonEmpty(inputChannels[0]);
        SingleInputGateTest.verifyBufferOrEvent((InputGate)inputGate, true, 0, true);
        SingleInputGateTest.verifyBufferOrEvent((InputGate)inputGate, true, 0, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testBackwardsEventWithUninitializedChannel() throws Exception {
        TestingTaskEventPublisher taskEventPublisher = new TestingTaskEventPublisher();
        TestingResultPartitionManager partitionManager = new TestingResultPartitionManager((ResultSubpartitionView)new NoOpResultSubpartitionView());
        NettyShuffleEnvironment environment = this.createNettyShuffleEnvironment();
        SingleInputGate inputGate = this.createInputGate(environment, 2, ResultPartitionType.PIPELINED);
        InputChannel[] inputChannels = new InputChannel[2];
        try {
            ResultPartitionID localPartitionId = new ResultPartitionID();
            inputChannels[0] = InputChannelBuilder.newBuilder().setPartitionId(localPartitionId).setPartitionManager(partitionManager).setTaskEventPublisher(taskEventPublisher).buildLocalChannel(inputGate);
            ResultPartitionID unknownPartitionId = new ResultPartitionID();
            inputChannels[1] = InputChannelBuilder.newBuilder().setChannelIndex(1).setPartitionId(unknownPartitionId).setPartitionManager(partitionManager).setTaskEventPublisher(taskEventPublisher).buildUnknownChannel(inputGate);
            InputGateFairnessTest.setupInputGate(inputGate, inputChannels);
            Assert.assertEquals((long)1L, (long)partitionManager.counter);
            TestTaskEvent event = new TestTaskEvent();
            inputGate.sendTaskEvent((TaskEvent)event);
            Assert.assertEquals((long)1L, (long)taskEventPublisher.counter);
            ResourceID location = ResourceID.generate();
            inputGate.updateInputChannel(location, NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation(unknownPartitionId.getPartitionId(), location));
            Assert.assertEquals((long)2L, (long)partitionManager.counter);
            Assert.assertEquals((long)2L, (long)taskEventPublisher.counter);
        }
        finally {
            inputGate.close();
            environment.close();
        }
    }

    @Test
    public void testUpdateChannelBeforeRequest() throws Exception {
        SingleInputGate inputGate = this.createInputGate(1);
        TestingResultPartitionManager partitionManager = new TestingResultPartitionManager((ResultSubpartitionView)new NoOpResultSubpartitionView());
        UnknownInputChannel unknown = InputChannelBuilder.newBuilder().setPartitionManager(partitionManager).buildUnknownChannel(inputGate);
        inputGate.setInputChannels(new InputChannel[]{unknown});
        ResultPartitionID resultPartitionID = unknown.getPartitionId();
        ResourceID location = ResourceID.generate();
        inputGate.updateInputChannel(location, NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation(resultPartitionID.getPartitionId(), location));
        Assert.assertEquals((long)0L, (long)partitionManager.counter);
    }

    @Test
    public void testReleaseWhilePollingChannel() throws Exception {
        final AtomicReference asyncException = new AtomicReference();
        final SingleInputGate inputGate = this.createInputGate(1);
        UnknownInputChannel inputChannel = InputChannelBuilder.newBuilder().buildUnknownChannel(inputGate);
        inputGate.setInputChannels(new InputChannel[]{inputChannel});
        Thread asyncConsumer = new Thread(){

            @Override
            public void run() {
                try {
                    inputGate.getNext();
                }
                catch (Exception e) {
                    asyncException.set(e);
                }
            }
        };
        asyncConsumer.start();
        boolean success = false;
        for (int i = 0; i < 50; ++i) {
            if (asyncConsumer.isAlive()) {
                boolean bl = success = asyncConsumer.getState() == Thread.State.WAITING;
            }
            if (success) break;
            Thread.sleep(100L);
        }
        Assert.assertTrue((String)"Did not trigger blocking buffer request.", (boolean)success);
        inputGate.close();
        asyncConsumer.join();
        Assert.assertNotNull(asyncException.get());
        Assert.assertEquals(IllegalStateException.class, ((Exception)asyncException.get()).getClass());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRequestBackoffConfiguration() throws Exception {
        IntermediateResultPartitionID[] partitionIds = new IntermediateResultPartitionID[]{new IntermediateResultPartitionID(), new IntermediateResultPartitionID(), new IntermediateResultPartitionID()};
        ResourceID localLocation = ResourceID.generate();
        ShuffleDescriptor[] channelDescs = new ShuffleDescriptor[]{NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation(partitionIds[0], localLocation), NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation(partitionIds[1], ResourceID.generate()), new UnknownShuffleDescriptor(new ResultPartitionID(partitionIds[2], new ExecutionAttemptID()))};
        InputGateDeploymentDescriptor gateDesc = new InputGateDeploymentDescriptor(new IntermediateDataSetID(), ResultPartitionType.PIPELINED, 0, channelDescs);
        int initialBackoff = 137;
        int maxBackoff = 1001;
        NettyShuffleEnvironment netEnv = new NettyShuffleEnvironmentBuilder().setPartitionRequestInitialBackoff(initialBackoff).setPartitionRequestMaxBackoff(maxBackoff).build();
        SingleInputGate gate = new SingleInputGateFactory(localLocation, netEnv.getConfiguration(), netEnv.getConnectionManager(), netEnv.getResultPartitionManager(), (TaskEventPublisher)new TaskEventDispatcher(), netEnv.getNetworkBufferPool()).create("TestTask", 0, gateDesc, SingleInputGateBuilder.NO_OP_PRODUCER_CHECKER, InputChannelTestUtils.newUnregisteredInputChannelMetrics());
        gate.convertRecoveredInputChannels();
        try {
            InputChannel[] channels;
            Assert.assertEquals((Object)gateDesc.getConsumedPartitionType(), (Object)gate.getConsumedPartitionType());
            Map channelMap = gate.getInputChannels();
            Assert.assertEquals((long)3L, (long)channelMap.size());
            InputChannel localChannel = (InputChannel)channelMap.get(partitionIds[0]);
            Assert.assertEquals(LocalInputChannel.class, localChannel.getClass());
            InputChannel remoteChannel = (InputChannel)channelMap.get(partitionIds[1]);
            Assert.assertEquals(RemoteInputChannel.class, remoteChannel.getClass());
            InputChannel unknownChannel = (InputChannel)channelMap.get(partitionIds[2]);
            Assert.assertEquals(UnknownInputChannel.class, unknownChannel.getClass());
            for (InputChannel ch : channels = new InputChannel[]{localChannel, remoteChannel, unknownChannel}) {
                Assert.assertEquals((long)0L, (long)ch.getCurrentBackoff());
                Assert.assertTrue((boolean)ch.increaseBackoff());
                Assert.assertEquals((long)initialBackoff, (long)ch.getCurrentBackoff());
                Assert.assertTrue((boolean)ch.increaseBackoff());
                Assert.assertEquals((long)(initialBackoff * 2), (long)ch.getCurrentBackoff());
                Assert.assertTrue((boolean)ch.increaseBackoff());
                Assert.assertEquals((long)(initialBackoff * 2 * 2), (long)ch.getCurrentBackoff());
                Assert.assertTrue((boolean)ch.increaseBackoff());
                Assert.assertEquals((long)maxBackoff, (long)ch.getCurrentBackoff());
                Assert.assertFalse((boolean)ch.increaseBackoff());
            }
        }
        finally {
            gate.close();
            netEnv.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRequestBuffersWithRemoteInputChannel() throws Exception {
        NettyShuffleEnvironment network = this.createNettyShuffleEnvironment();
        SingleInputGate inputGate = this.createInputGate(network, 1, ResultPartitionType.PIPELINED_BOUNDED);
        int buffersPerChannel = 2;
        int extraNetworkBuffersPerGate = 8;
        try {
            RemoteInputChannel remote = InputChannelBuilder.newBuilder().setupFromNettyShuffleEnvironment(network).setConnectionManager(new TestingConnectionManager()).buildRemoteChannel(inputGate);
            inputGate.setInputChannels(new InputChannel[]{remote});
            inputGate.setup();
            NetworkBufferPool bufferPool = network.getNetworkBufferPool();
            Assert.assertEquals((long)buffersPerChannel, (long)remote.getNumberOfAvailableBuffers());
            Assert.assertEquals((long)(bufferPool.getTotalNumberOfMemorySegments() - buffersPerChannel), (long)bufferPool.getNumberOfAvailableMemorySegments());
            Assert.assertEquals((long)extraNetworkBuffersPerGate, (long)bufferPool.countBuffers());
        }
        finally {
            inputGate.close();
            network.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRequestBuffersWithUnknownInputChannel() throws Exception {
        NettyShuffleEnvironment network = this.createNettyShuffleEnvironment();
        SingleInputGate inputGate = this.createInputGate(network, 1, ResultPartitionType.PIPELINED_BOUNDED);
        int buffersPerChannel = 2;
        int extraNetworkBuffersPerGate = 8;
        try {
            ResultPartitionID resultPartitionId = new ResultPartitionID();
            InputChannel inputChannel = this.buildUnknownInputChannel(network, inputGate, resultPartitionId, 0);
            inputGate.setInputChannels(new InputChannel[]{inputChannel});
            inputGate.setup();
            NetworkBufferPool bufferPool = network.getNetworkBufferPool();
            Assert.assertEquals((long)bufferPool.getTotalNumberOfMemorySegments(), (long)bufferPool.getNumberOfAvailableMemorySegments());
            Assert.assertEquals((long)extraNetworkBuffersPerGate, (long)bufferPool.countBuffers());
            inputGate.updateInputChannel(ResourceID.generate(), NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation(resultPartitionId.getPartitionId(), ResourceID.generate()));
            RemoteInputChannel remote = (RemoteInputChannel)inputGate.getInputChannels().get(resultPartitionId.getPartitionId());
            Assert.assertEquals((long)buffersPerChannel, (long)remote.getNumberOfAvailableBuffers());
            Assert.assertEquals((long)(bufferPool.getTotalNumberOfMemorySegments() - buffersPerChannel), (long)bufferPool.getNumberOfAvailableMemorySegments());
            Assert.assertEquals((long)extraNetworkBuffersPerGate, (long)bufferPool.countBuffers());
        }
        finally {
            inputGate.close();
            network.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testUpdateUnknownInputChannel() throws Exception {
        NettyShuffleEnvironment network = this.createNettyShuffleEnvironment();
        ResultPartition localResultPartition = new ResultPartitionBuilder().setResultPartitionManager(network.getResultPartitionManager()).setupBufferPoolFactoryFromNettyShuffleEnvironment(network).build();
        ResultPartition remoteResultPartition = new ResultPartitionBuilder().setResultPartitionManager(network.getResultPartitionManager()).setupBufferPoolFactoryFromNettyShuffleEnvironment(network).build();
        localResultPartition.setup();
        remoteResultPartition.setup();
        SingleInputGate inputGate = this.createInputGate(network, 2, ResultPartitionType.PIPELINED);
        InputChannel[] inputChannels = new InputChannel[2];
        try {
            ResultPartitionID localResultPartitionId = localResultPartition.getPartitionId();
            inputChannels[0] = this.buildUnknownInputChannel(network, inputGate, localResultPartitionId, 0);
            ResultPartitionID remoteResultPartitionId = remoteResultPartition.getPartitionId();
            inputChannels[1] = this.buildUnknownInputChannel(network, inputGate, remoteResultPartitionId, 1);
            inputGate.setInputChannels(inputChannels);
            inputGate.setup();
            MatcherAssert.assertThat(inputGate.getInputChannels().get(remoteResultPartitionId.getPartitionId()), (Matcher)Matchers.is((Matcher)Matchers.instanceOf(UnknownInputChannel.class)));
            MatcherAssert.assertThat(inputGate.getInputChannels().get(localResultPartitionId.getPartitionId()), (Matcher)Matchers.is((Matcher)Matchers.instanceOf(UnknownInputChannel.class)));
            ResourceID localLocation = ResourceID.generate();
            inputGate.updateInputChannel(localLocation, NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation(remoteResultPartitionId.getPartitionId(), ResourceID.generate()));
            MatcherAssert.assertThat(inputGate.getInputChannels().get(remoteResultPartitionId.getPartitionId()), (Matcher)Matchers.is((Matcher)Matchers.instanceOf(RemoteInputChannel.class)));
            MatcherAssert.assertThat(inputGate.getInputChannels().get(localResultPartitionId.getPartitionId()), (Matcher)Matchers.is((Matcher)Matchers.instanceOf(UnknownInputChannel.class)));
            inputGate.updateInputChannel(localLocation, NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation(localResultPartitionId.getPartitionId(), localLocation));
            MatcherAssert.assertThat(inputGate.getInputChannels().get(remoteResultPartitionId.getPartitionId()), (Matcher)Matchers.is((Matcher)Matchers.instanceOf(RemoteInputChannel.class)));
            MatcherAssert.assertThat(inputGate.getInputChannels().get(localResultPartitionId.getPartitionId()), (Matcher)Matchers.is((Matcher)Matchers.instanceOf(LocalInputChannel.class)));
        }
        finally {
            inputGate.close();
            network.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testQueuedBuffers() throws Exception {
        NettyShuffleEnvironment network = this.createNettyShuffleEnvironment();
        ResultPartition resultPartition = new ResultPartitionBuilder().setResultPartitionManager(network.getResultPartitionManager()).setupBufferPoolFactoryFromNettyShuffleEnvironment(network).build();
        SingleInputGate inputGate = this.createInputGate(network, 2, ResultPartitionType.PIPELINED);
        ResultPartitionID localResultPartitionId = resultPartition.getPartitionId();
        InputChannel[] inputChannels = new InputChannel[2];
        RemoteInputChannel remoteInputChannel = InputChannelBuilder.newBuilder().setChannelIndex(1).setupFromNettyShuffleEnvironment(network).setConnectionManager(new TestingConnectionManager()).buildRemoteChannel(inputGate);
        inputChannels[0] = remoteInputChannel;
        inputChannels[1] = InputChannelBuilder.newBuilder().setChannelIndex(0).setPartitionId(localResultPartitionId).setupFromNettyShuffleEnvironment(network).setConnectionManager(new TestingConnectionManager()).buildLocalChannel(inputGate);
        try {
            resultPartition.setup();
            InputGateFairnessTest.setupInputGate(inputGate, inputChannels);
            remoteInputChannel.onBuffer(TestBufferFactory.createBuffer(1), 0, 0);
            Assert.assertEquals((long)1L, (long)inputGate.getNumberOfQueuedBuffers());
            resultPartition.addBufferConsumer(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(1), 0);
            Assert.assertEquals((long)2L, (long)inputGate.getNumberOfQueuedBuffers());
        }
        finally {
            resultPartition.release();
            inputGate.close();
            network.close();
        }
    }

    @Test
    public void testBufferReceivedListener() throws Exception {
        NettyShuffleEnvironment network = this.createNettyShuffleEnvironment();
        SingleInputGate inputGate = this.createInputGate(network, 2, ResultPartitionType.PIPELINED);
        RemoteInputChannel remoteInputChannel1 = InputChannelBuilder.newBuilder().setChannelIndex(0).setupFromNettyShuffleEnvironment(network).setConnectionManager(new TestingConnectionManager()).buildRemoteChannel(inputGate);
        RemoteInputChannel remoteInputChannel2 = InputChannelBuilder.newBuilder().setChannelIndex(1).setupFromNettyShuffleEnvironment(network).setConnectionManager(new TestingConnectionManager()).buildRemoteChannel(inputGate);
        inputGate.setInputChannels(new InputChannel[]{remoteInputChannel1, remoteInputChannel2});
        final ArrayList<BufferOrEvent> notifications = new ArrayList<BufferOrEvent>();
        inputGate.registerBufferReceivedListener(new BufferReceivedListener(){

            public void notifyBufferReceived(Buffer buffer, InputChannelInfo channelInfo) {
                notifications.add(new BufferOrEvent(buffer, channelInfo));
            }

            public void notifyBarrierReceived(CheckpointBarrier barrier, InputChannelInfo channelInfo) {
                notifications.add(new BufferOrEvent((AbstractEvent)barrier, channelInfo));
            }
        });
        InputGateFairnessTest.setupInputGate(inputGate, new InputChannel[]{remoteInputChannel1, remoteInputChannel2});
        CheckpointOptions options = new CheckpointOptions(CheckpointType.CHECKPOINT, new CheckpointStorageLocationReference(new byte[]{0, 1, 2}));
        remoteInputChannel1.onBuffer(TestBufferFactory.createBuffer(1), 0, 0);
        remoteInputChannel2.onBuffer(EventSerializer.toBuffer((AbstractEvent)new CheckpointBarrier(0L, 0L, options)), 0, 0);
        remoteInputChannel1.spillInflightBuffers(0L, ChannelStateWriter.NO_OP);
        remoteInputChannel2.spillInflightBuffers(0L, ChannelStateWriter.NO_OP);
        remoteInputChannel1.onBuffer(TestBufferFactory.createBuffer(11), 1, 0);
        remoteInputChannel2.onBuffer(TestBufferFactory.createBuffer(12), 1, 0);
        remoteInputChannel1.onBuffer(EventSerializer.toBuffer((AbstractEvent)new CheckpointBarrier(1L, 0L, options)), 2, 0);
        remoteInputChannel1.spillInflightBuffers(1L, ChannelStateWriter.NO_OP);
        remoteInputChannel2.spillInflightBuffers(1L, ChannelStateWriter.NO_OP);
        remoteInputChannel1.onBuffer(TestBufferFactory.createBuffer(21), 3, 0);
        remoteInputChannel2.onBuffer(TestBufferFactory.createBuffer(22), 2, 0);
        inputGate.notifyChannelNonEmpty((InputChannel)remoteInputChannel1);
        inputGate.notifyChannelNonEmpty((InputChannel)remoteInputChannel2);
        while (inputGate.pollNext().isPresent()) {
        }
        Assert.assertEquals(this.getIds(Arrays.asList(new BufferOrEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, options), remoteInputChannel2.getChannelInfo()), new BufferOrEvent(TestBufferFactory.createBuffer(11), remoteInputChannel1.getChannelInfo()), new BufferOrEvent((AbstractEvent)new CheckpointBarrier(1L, 0L, options), remoteInputChannel1.getChannelInfo()), new BufferOrEvent(TestBufferFactory.createBuffer(22), remoteInputChannel2.getChannelInfo()))), this.getIds(notifications));
    }

    private List<Object> getIds(Collection<BufferOrEvent> buffers) {
        return buffers.stream().map(boe -> boe.isBuffer() ? Integer.valueOf(boe.getSize()) : boe.getEvent()).collect(Collectors.toList());
    }

    @Test
    public void testPartitionNotFoundExceptionWhileGetNextBuffer() throws Exception {
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1);
        LocalInputChannel localChannel = InputChannelTestUtils.createLocalInputChannel(inputGate, new ResultPartitionManager());
        ResultPartitionID partitionId = localChannel.getPartitionId();
        inputGate.setInputChannels(new InputChannel[]{localChannel});
        localChannel.setError((Throwable)new PartitionNotFoundException(partitionId));
        try {
            inputGate.getNext();
            Assert.fail((String)"Should throw a PartitionNotFoundException.");
        }
        catch (PartitionNotFoundException notFound) {
            MatcherAssert.assertThat((Object)partitionId, (Matcher)Matchers.is((Object)notFound.getPartitionId()));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testInputGateRemovalFromNettyShuffleEnvironment() throws Exception {
        try (NettyShuffleEnvironment network = this.createNettyShuffleEnvironment();){
            int numberOfGates = 10;
            Map<InputGateID, SingleInputGate> createdInputGatesById = SingleInputGateTest.createInputGateWithLocalChannels(network, numberOfGates, 1);
            Assert.assertEquals((long)numberOfGates, (long)createdInputGatesById.size());
            for (InputGateID id : createdInputGatesById.keySet()) {
                MatcherAssert.assertThat((Object)network.getInputGate(id).isPresent(), (Matcher)Matchers.is((Object)true));
                createdInputGatesById.get(id).close();
                MatcherAssert.assertThat((Object)network.getInputGate(id).isPresent(), (Matcher)Matchers.is((Object)false));
            }
        }
    }

    @Test
    public void testSingleInputGateInfo() {
        int numSingleInputGates = 2;
        int numInputChannels = 3;
        for (int i = 0; i < 2; ++i) {
            SingleInputGate gate = new SingleInputGateBuilder().setSingleInputGateIndex(i).setNumberOfChannels(3).build();
            int channelCounter = 0;
            for (InputChannel inputChannel : gate.getInputChannels().values()) {
                InputChannelInfo channelInfo = inputChannel.getChannelInfo();
                Assert.assertEquals((long)i, (long)channelInfo.getGateIdx());
                Assert.assertEquals((long)channelCounter++, (long)channelInfo.getInputChannelIdx());
            }
        }
    }

    @Test
    public void testConcurrentReceiveBuffersAndSpillInflightBuffers() throws Exception {
        NettyShuffleEnvironment network = this.createNettyShuffleEnvironment();
        SingleInputGate inputGate = this.createInputGate(network, 1, ResultPartitionType.PIPELINED);
        RemoteInputChannel inputChannel = InputChannelBuilder.newBuilder().setChannelIndex(0).setupFromNettyShuffleEnvironment(network).setConnectionManager(new TestingConnectionManager()).buildRemoteChannel(inputGate);
        final ArrayList inflightBuffers = new ArrayList();
        inputGate.registerBufferReceivedListener(new BufferReceivedListener(){

            public void notifyBufferReceived(Buffer buffer, InputChannelInfo channelInfo) {
                inflightBuffers.add(buffer);
            }

            public void notifyBarrierReceived(CheckpointBarrier barrier, InputChannelInfo channelInfo) {
            }
        });
        ArrayList<Buffer> buffers = new ArrayList<Buffer>();
        for (int i = 0; i < 1024; ++i) {
            buffers.add(BufferBuilderTestUtils.buildSomeBuffer(1024));
        }
        CheckpointBarrier barrier = new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation());
        Thread bufferReceiver = new Thread(() -> {
            try {
                for (int i = 0; i < buffers.size(); ++i) {
                    inputChannel.onBuffer((Buffer)buffers.get(i), i, 0);
                }
                inputChannel.onBuffer(EventSerializer.toBuffer((AbstractEvent)barrier), buffers.size(), 0);
                inputChannel.onBuffer(BufferBuilderTestUtils.buildSomeBuffer(1024), buffers.size() + 1, 0);
            }
            catch (IOException e) {
                ExceptionUtils.rethrow((Throwable)e);
            }
        });
        bufferReceiver.start();
        inputChannel.spillInflightBuffers(0L, (ChannelStateWriter)new ChannelStateWriter.NoOpChannelStateWriter(){

            public void addInputData(long checkpointId, InputChannelInfo info, int startSeqNum, CloseableIterator<Buffer> iterator) {
                ArrayList list = new ArrayList();
                iterator.forEachRemaining(list::add);
                inflightBuffers.addAll(list);
                try {
                    iterator.close();
                }
                catch (Exception e) {
                    ExceptionUtils.rethrow((Throwable)e);
                }
            }
        });
        bufferReceiver.join();
        Assert.assertArrayEquals((Object[])buffers.toArray(), (Object[])inflightBuffers.toArray());
    }

    private static Map<InputGateID, SingleInputGate> createInputGateWithLocalChannels(NettyShuffleEnvironment network, int numberOfGates, int numberOfLocalChannels) {
        NettyShuffleDescriptor[] channelDescs = new NettyShuffleDescriptor[numberOfLocalChannels];
        for (int i = 0; i < numberOfLocalChannels; ++i) {
            channelDescs[i] = NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation(new IntermediateResultPartitionID(), ResourceID.generate());
        }
        InputGateDeploymentDescriptor[] gateDescs = new InputGateDeploymentDescriptor[numberOfGates];
        IntermediateDataSetID[] ids = new IntermediateDataSetID[numberOfGates];
        for (int i = 0; i < numberOfGates; ++i) {
            ids[i] = new IntermediateDataSetID();
            gateDescs[i] = new InputGateDeploymentDescriptor(ids[i], ResultPartitionType.PIPELINED, 0, (ShuffleDescriptor[])channelDescs);
        }
        ExecutionAttemptID consumerID = new ExecutionAttemptID();
        SingleInputGate[] gates = network.createInputGates(network.createShuffleIOOwnerContext("", consumerID, (MetricGroup)new UnregisteredMetricsGroup()), SingleInputGateBuilder.NO_OP_PRODUCER_CHECKER, Arrays.asList(gateDescs)).toArray(new SingleInputGate[0]);
        HashMap<InputGateID, SingleInputGate> inputGatesById = new HashMap<InputGateID, SingleInputGate>();
        for (int i = 0; i < numberOfGates; ++i) {
            inputGatesById.put(new InputGateID(ids[i], consumerID), gates[i]);
        }
        return inputGatesById;
    }

    private InputChannel buildUnknownInputChannel(NettyShuffleEnvironment network, SingleInputGate inputGate, ResultPartitionID partitionId, int channelIndex) {
        return InputChannelBuilder.newBuilder().setChannelIndex(channelIndex).setPartitionId(partitionId).setupFromNettyShuffleEnvironment(network).setConnectionManager(new TestingConnectionManager()).buildUnknownChannel(inputGate);
    }

    private NettyShuffleEnvironment createNettyShuffleEnvironment() {
        return new NettyShuffleEnvironmentBuilder().build();
    }

    static void verifyBufferOrEvent(InputGate inputGate, boolean expectedIsBuffer, int expectedChannelIndex, boolean expectedMoreAvailable) throws IOException, InterruptedException {
        Optional bufferOrEvent = inputGate.getNext();
        Assert.assertTrue((boolean)bufferOrEvent.isPresent());
        Assert.assertEquals((Object)expectedIsBuffer, (Object)((BufferOrEvent)bufferOrEvent.get()).isBuffer());
        Assert.assertEquals((Object)inputGate.getChannel(expectedChannelIndex).getChannelInfo(), (Object)((BufferOrEvent)bufferOrEvent.get()).getChannelInfo());
        Assert.assertEquals((Object)expectedMoreAvailable, (Object)((BufferOrEvent)bufferOrEvent.get()).moreAvailable());
        if (!expectedMoreAvailable) {
            Assert.assertFalse((boolean)inputGate.pollNext().isPresent());
        }
    }

    private SingleInputGate createInputGate(NettyShuffleEnvironment environment) {
        SingleInputGate inputGate = this.createInputGate(environment, 3, ResultPartitionType.PIPELINED);
        RemoteRecoveredInputChannel remoteChannel = new InputChannelBuilder().setChannelIndex(0).buildRemoteRecoveredChannel(inputGate);
        LocalRecoveredInputChannel localChannel = new InputChannelBuilder().setChannelIndex(1).buildLocalRecoveredChannel(inputGate);
        UnknownInputChannel unknownChannel = new InputChannelBuilder().setChannelIndex(2).buildUnknownChannel(inputGate);
        inputGate.setInputChannels(new InputChannel[]{remoteChannel, localChannel, unknownChannel});
        return inputGate;
    }

    private boolean getNextBufferAndVerify(SingleInputGate inputGate, int[] states) throws Exception {
        Optional bufferOrEvent = inputGate.pollNext();
        if (bufferOrEvent.isPresent()) {
            Assert.assertTrue((boolean)((BufferOrEvent)bufferOrEvent.get()).isBuffer());
            Buffer buffer = ((BufferOrEvent)bufferOrEvent.get()).getBuffer();
            BufferBuilderAndConsumerTest.assertContent(buffer, null, states);
            buffer.recycleBuffer();
            return true;
        }
        return false;
    }

    private static class TestingTaskEventPublisher
    implements TaskEventPublisher {
        private int counter = 0;

        private TestingTaskEventPublisher() {
        }

        public boolean publish(ResultPartitionID partitionId, TaskEvent event) {
            ++this.counter;
            return true;
        }
    }

    public static class TestingResultPartitionManager
    extends ResultPartitionManager {
        private int counter = 0;
        private final ResultSubpartitionView subpartitionView;

        public TestingResultPartitionManager(ResultSubpartitionView subpartitionView) {
            this.subpartitionView = subpartitionView;
        }

        public ResultSubpartitionView createSubpartitionView(ResultPartitionID partitionId, int subpartitionIndex, BufferAvailabilityListener availabilityListener) throws IOException {
            ++this.counter;
            return this.subpartitionView;
        }
    }
}

