package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.lang.Thread;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.SubpartitionIndexRange;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.io.PullingAsyncDataInput;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.TaskEventPublisher;
import org.apache.flink.runtime.io.network.TestingConnectionManager;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition;
import org.apache.flink.runtime.io.network.partition.ChannelStateHolder;
import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
import org.apache.flink.runtime.io.network.partition.InputGateFairnessTest;
import org.apache.flink.runtime.io.network.partition.NoOpResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.runtime.io.network.util.TestTaskEvent;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
import org.apache.flink.util.CompressedSerializedValue;
import org.apache.flink.util.Preconditions;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.class */
public class SingleInputGateTest extends InputGateTestBase {

    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest$TestingResultPartitionManager.class */
    public static class TestingResultPartitionManager extends ResultPartitionManager {
        private int counter = 0;
        private final ResultSubpartitionView subpartitionView;

        public TestingResultPartitionManager(ResultSubpartitionView resultSubpartitionView) {
            this.subpartitionView = resultSubpartitionView;
        }

        public ResultSubpartitionView createSubpartitionView(ResultPartitionID resultPartitionID, int i, BufferAvailabilityListener bufferAvailabilityListener) throws IOException {
            this.counter++;
            return this.subpartitionView;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest$TestingTaskEventPublisher.class */
    private static class TestingTaskEventPublisher implements TaskEventPublisher {
        private int counter;

        private TestingTaskEventPublisher() {
            this.counter = 0;
        }

        public boolean publish(ResultPartitionID resultPartitionID, TaskEvent taskEvent) {
            this.counter++;
            return true;
        }
    }

    @Test(expected = CheckpointException.class)
    public void testCheckpointsDeclinedUnlessAllChannelsAreKnown() throws CheckpointException {
        SingleInputGate createInputGate = createInputGate(createNettyShuffleEnvironment(), 1, ResultPartitionType.PIPELINED);
        createInputGate.setInputChannels(new InputChannel[]{new InputChannelBuilder().setChannelIndex(0).buildUnknownChannel(createInputGate)});
        createInputGate.checkpointStarted(new CheckpointBarrier(1L, 1L, CheckpointOptions.alignedNoTimeout(CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault())));
    }

    @Test(expected = CheckpointException.class)
    public void testCheckpointsDeclinedUnlessStateConsumed() throws CheckpointException {
        SingleInputGate createInputGate = createInputGate(createNettyShuffleEnvironment());
        Preconditions.checkState(!createInputGate.getStateConsumedFuture().isDone());
        createInputGate.checkpointStarted(new CheckpointBarrier(1L, 1L, CheckpointOptions.alignedNoTimeout(CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault())));
    }

    @Test
    public void testSetupLogic() throws Exception {
        NettyShuffleEnvironment createNettyShuffleEnvironment = createNettyShuffleEnvironment();
        SingleInputGate createInputGate = createInputGate(createNettyShuffleEnvironment);
        Closer create = Closer.create();
        Throwable th = null;
        try {
            try {
                createNettyShuffleEnvironment.getClass();
                create.register(createNettyShuffleEnvironment::close);
                createInputGate.getClass();
                create.register(createInputGate::close);
                Assert.assertNull(createInputGate.getBufferPool());
                for (RecoveredInputChannel recoveredInputChannel : createInputGate.getInputChannels().values()) {
                    Assert.assertTrue((recoveredInputChannel instanceof RecoveredInputChannel) || (recoveredInputChannel instanceof UnknownInputChannel));
                    if (recoveredInputChannel instanceof RecoveredInputChannel) {
                        Assert.assertEquals(0L, recoveredInputChannel.bufferManager.getNumberOfAvailableBuffers());
                    }
                }
                createInputGate.setup();
                Assert.assertNotNull(createInputGate.getBufferPool());
                Assert.assertEquals(1L, createInputGate.getBufferPool().getNumberOfRequiredMemorySegments());
                for (RemoteRecoveredInputChannel remoteRecoveredInputChannel : createInputGate.getInputChannels().values()) {
                    if (remoteRecoveredInputChannel instanceof RemoteRecoveredInputChannel) {
                        Assert.assertEquals(0L, remoteRecoveredInputChannel.bufferManager.getNumberOfAvailableBuffers());
                    } else if (remoteRecoveredInputChannel instanceof LocalRecoveredInputChannel) {
                        Assert.assertEquals(0L, ((LocalRecoveredInputChannel) remoteRecoveredInputChannel).bufferManager.getNumberOfAvailableBuffers());
                    }
                }
                createInputGate.convertRecoveredInputChannels();
                Assert.assertNotNull(createInputGate.getBufferPool());
                Assert.assertEquals(1L, createInputGate.getBufferPool().getNumberOfRequiredMemorySegments());
                Iterator it = createInputGate.getInputChannels().values().iterator();
                while (it.hasNext()) {
                    if (((InputChannel) it.next()) instanceof RemoteInputChannel) {
                        Assert.assertEquals(2L, r0.getNumberOfAvailableBuffers());
                    }
                }
                if (create != null) {
                    if (0 == 0) {
                        create.close();
                        return;
                    }
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    create.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testPartitionRequestLogic() throws Exception {
        NettyShuffleEnvironment build = new NettyShuffleEnvironmentBuilder().build();
        SingleInputGate createInputGate = createInputGate(build);
        Closer create = Closer.create();
        Throwable th = null;
        try {
            try {
                build.getClass();
                create.register(build::close);
                createInputGate.getClass();
                create.register(createInputGate::close);
                createInputGate.finishReadRecoveredState();
                while (!createInputGate.getStateConsumedFuture().isDone()) {
                    createInputGate.pollNext();
                }
                createInputGate.requestPartitions();
                createInputGate.pollNext();
                RemoteInputChannel channel = createInputGate.getChannel(0);
                MatcherAssert.assertThat(channel, Matchers.instanceOf(RemoteInputChannel.class));
                Assert.assertNotNull(channel.getPartitionRequestClient());
                Assert.assertEquals(2L, channel.getInitialCredit());
                LocalInputChannel channel2 = createInputGate.getChannel(1);
                MatcherAssert.assertThat(channel2, Matchers.instanceOf(LocalInputChannel.class));
                Assert.assertNotNull(channel2.getSubpartitionView());
                MatcherAssert.assertThat(createInputGate.getChannel(2), Matchers.instanceOf(UnknownInputChannel.class));
                if (create != null) {
                    if (0 == 0) {
                        create.close();
                        return;
                    }
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    create.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testBasicGetNextLogic() throws Exception {
        SingleInputGate createInputGate = createInputGate();
        TestInputChannel[] testInputChannelArr = {new TestInputChannel(createInputGate, 0), new TestInputChannel(createInputGate, 1)};
        createInputGate.setInputChannels(testInputChannelArr);
        testInputChannelArr[0].readBuffer();
        testInputChannelArr[0].readBuffer();
        testInputChannelArr[1].readBuffer();
        testInputChannelArr[1].readEndOfData();
        testInputChannelArr[0].readEndOfData();
        testInputChannelArr[1].readEndOfPartitionEvent();
        testInputChannelArr[0].readEndOfPartitionEvent();
        createInputGate.notifyChannelNonEmpty(testInputChannelArr[0]);
        createInputGate.notifyChannelNonEmpty(testInputChannelArr[1]);
        verifyBufferOrEvent(createInputGate, true, 0, true);
        verifyBufferOrEvent(createInputGate, true, 1, true);
        verifyBufferOrEvent(createInputGate, true, 0, true);
        verifyBufferOrEvent(createInputGate, false, 1, true);
        Assert.assertEquals(PullingAsyncDataInput.EndOfDataStatus.NOT_END_OF_DATA, createInputGate.hasReceivedEndOfData());
        verifyBufferOrEvent(createInputGate, false, 0, true);
        Assert.assertFalse(createInputGate.isFinished());
        Assert.assertEquals(PullingAsyncDataInput.EndOfDataStatus.DRAINED, createInputGate.hasReceivedEndOfData());
        verifyBufferOrEvent(createInputGate, false, 1, true);
        verifyBufferOrEvent(createInputGate, false, 0, false);
        Assert.assertEquals(PullingAsyncDataInput.EndOfDataStatus.DRAINED, createInputGate.hasReceivedEndOfData());
        Assert.assertTrue(createInputGate.isFinished());
        for (TestInputChannel testInputChannel : testInputChannelArr) {
            testInputChannel.assertReturnedEventsAreRecycled();
        }
    }

    @Test
    public void testDrainFlagComputation() throws Exception {
        SingleInputGate createInputGate = createInputGate();
        SingleInputGate createInputGate2 = createInputGate();
        TestInputChannel[] testInputChannelArr = {new TestInputChannel(createInputGate, 0), new TestInputChannel(createInputGate, 1)};
        createInputGate.setInputChannels(testInputChannelArr);
        TestInputChannel[] testInputChannelArr2 = {new TestInputChannel(createInputGate2, 0), new TestInputChannel(createInputGate2, 1)};
        createInputGate2.setInputChannels(testInputChannelArr2);
        testInputChannelArr[1].readEndOfData(StopMode.DRAIN);
        testInputChannelArr[0].readEndOfData(StopMode.NO_DRAIN);
        testInputChannelArr2[1].readEndOfData(StopMode.DRAIN);
        testInputChannelArr2[0].readEndOfData(StopMode.DRAIN);
        createInputGate.notifyChannelNonEmpty(testInputChannelArr[0]);
        createInputGate.notifyChannelNonEmpty(testInputChannelArr[1]);
        createInputGate2.notifyChannelNonEmpty(testInputChannelArr2[0]);
        createInputGate2.notifyChannelNonEmpty(testInputChannelArr2[1]);
        verifyBufferOrEvent(createInputGate, false, 0, true);
        Assert.assertEquals(PullingAsyncDataInput.EndOfDataStatus.NOT_END_OF_DATA, createInputGate.hasReceivedEndOfData());
        verifyBufferOrEvent(createInputGate, false, 1, true);
        Assert.assertEquals(PullingAsyncDataInput.EndOfDataStatus.STOPPED, createInputGate.hasReceivedEndOfData());
        verifyBufferOrEvent(createInputGate2, false, 0, true);
        Assert.assertEquals(PullingAsyncDataInput.EndOfDataStatus.NOT_END_OF_DATA, createInputGate2.hasReceivedEndOfData());
        verifyBufferOrEvent(createInputGate2, false, 1, true);
        Assert.assertEquals(PullingAsyncDataInput.EndOfDataStatus.DRAINED, createInputGate2.hasReceivedEndOfData());
    }

    @Test
    public void testGetCompressedBuffer() throws Exception {
        BufferCompressor bufferCompressor = new BufferCompressor(1024, "LZ4");
        SingleInputGate build = new SingleInputGateBuilder().setBufferDecompressor(new BufferDecompressor(1024, "LZ4")).build();
        Throwable th = null;
        try {
            try {
                TestInputChannel testInputChannel = new TestInputChannel(build, 0);
                MemorySegment allocateUnpooledSegment = MemorySegmentFactory.allocateUnpooledSegment(1024);
                for (int i = 0; i < 1024; i += 8) {
                    allocateUnpooledSegment.putLongLittleEndian(i, i);
                }
                NetworkBuffer networkBuffer = new NetworkBuffer(allocateUnpooledSegment, FreeingBufferRecycler.INSTANCE);
                networkBuffer.setSize(1024);
                Buffer compressToOriginalBuffer = bufferCompressor.compressToOriginalBuffer(networkBuffer);
                Assert.assertTrue(compressToOriginalBuffer.isCompressed());
                testInputChannel.read(compressToOriginalBuffer);
                build.setInputChannels(new InputChannel[]{testInputChannel});
                build.notifyChannelNonEmpty(testInputChannel);
                Optional next = build.getNext();
                Assert.assertTrue(next.isPresent());
                Assert.assertTrue(((BufferOrEvent) next.get()).isBuffer());
                ByteBuffer order = ((BufferOrEvent) next.get()).getBuffer().getNioBufferReadable().order(ByteOrder.LITTLE_ENDIAN);
                for (int i2 = 0; i2 < 1024; i2 += 8) {
                    Assert.assertEquals(i2, order.getLong());
                }
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testNotifyAfterEndOfPartition() throws Exception {
        SingleInputGate createInputGate = createInputGate(2);
        TestInputChannel testInputChannel = new TestInputChannel(createInputGate, 0);
        createInputGate.setInputChannels(new InputChannel[]{testInputChannel, new TestInputChannel(createInputGate, 1)});
        testInputChannel.readEndOfPartitionEvent();
        testInputChannel.notifyChannelNonEmpty();
        Assert.assertEquals(EndOfPartitionEvent.INSTANCE, ((BufferOrEvent) createInputGate.pollNext().get()).getEvent());
        testInputChannel.notifyChannelNonEmpty();
        Assert.assertFalse(createInputGate.pollNext().isPresent());
    }

    @Test
    public void testIsAvailable() throws Exception {
        SingleInputGate createInputGate = createInputGate(1);
        TestInputChannel testInputChannel = new TestInputChannel(createInputGate, 0);
        createInputGate.setInputChannels(new InputChannel[]{testInputChannel});
        testIsAvailable(createInputGate, createInputGate, testInputChannel);
    }

    @Test
    public void testIsAvailableAfterFinished() throws Exception {
        SingleInputGate createInputGate = createInputGate(1);
        TestInputChannel testInputChannel = new TestInputChannel(createInputGate, 0);
        createInputGate.setInputChannels(new InputChannel[]{testInputChannel});
        testIsAvailableAfterFinished(createInputGate, () -> {
            testInputChannel.readEndOfPartitionEvent();
            createInputGate.notifyChannelNonEmpty(testInputChannel);
        });
    }

    @Test
    public void testIsMoreAvailableReadingFromSingleInputChannel() throws Exception {
        SingleInputGate createInputGate = createInputGate();
        TestInputChannel[] testInputChannelArr = {new TestInputChannel(createInputGate, 0), new TestInputChannel(createInputGate, 1)};
        createInputGate.setInputChannels(testInputChannelArr);
        testInputChannelArr[0].readBuffer();
        testInputChannelArr[0].readEndOfPartitionEvent();
        createInputGate.notifyChannelNonEmpty(testInputChannelArr[0]);
        verifyBufferOrEvent(createInputGate, true, 0, true);
        verifyBufferOrEvent(createInputGate, false, 0, false);
    }

    @Test
    public void testBackwardsEventWithUninitializedChannel() throws Exception {
        TestingTaskEventPublisher testingTaskEventPublisher = new TestingTaskEventPublisher();
        TestingResultPartitionManager testingResultPartitionManager = new TestingResultPartitionManager(new NoOpResultSubpartitionView());
        NettyShuffleEnvironment createNettyShuffleEnvironment = createNettyShuffleEnvironment();
        SingleInputGate createInputGate = createInputGate(createNettyShuffleEnvironment, 2, ResultPartitionType.PIPELINED);
        InputChannel[] inputChannelArr = new InputChannel[2];
        Closer create = Closer.create();
        Throwable th = null;
        try {
            try {
                createNettyShuffleEnvironment.getClass();
                create.register(createNettyShuffleEnvironment::close);
                createInputGate.getClass();
                create.register(createInputGate::close);
                inputChannelArr[0] = InputChannelBuilder.newBuilder().setPartitionId(new ResultPartitionID()).setPartitionManager(testingResultPartitionManager).setTaskEventPublisher(testingTaskEventPublisher).buildLocalChannel(createInputGate);
                ResultPartitionID resultPartitionID = new ResultPartitionID();
                inputChannelArr[1] = InputChannelBuilder.newBuilder().setChannelIndex(1).setPartitionId(resultPartitionID).setPartitionManager(testingResultPartitionManager).setTaskEventPublisher(testingTaskEventPublisher).buildUnknownChannel(createInputGate);
                InputGateFairnessTest.setupInputGate(createInputGate, inputChannelArr);
                Assert.assertEquals(1L, testingResultPartitionManager.counter);
                createInputGate.sendTaskEvent(new TestTaskEvent());
                Assert.assertEquals(1L, testingTaskEventPublisher.counter);
                ResourceID generate = ResourceID.generate();
                createInputGate.updateInputChannel(generate, NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation(resultPartitionID.getPartitionId(), generate));
                Assert.assertEquals(2L, testingResultPartitionManager.counter);
                Assert.assertEquals(2L, testingTaskEventPublisher.counter);
                if (create != null) {
                    if (0 == 0) {
                        create.close();
                        return;
                    }
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    create.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testUpdateChannelBeforeRequest() throws Exception {
        SingleInputGate createInputGate = createInputGate(1);
        InputChannel buildUnknownChannel = InputChannelBuilder.newBuilder().setPartitionManager(new TestingResultPartitionManager(new NoOpResultSubpartitionView())).buildUnknownChannel(createInputGate);
        createInputGate.setInputChannels(new InputChannel[]{buildUnknownChannel});
        ResultPartitionID partitionId = buildUnknownChannel.getPartitionId();
        ResourceID generate = ResourceID.generate();
        createInputGate.updateInputChannel(generate, NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation(partitionId.getPartitionId(), generate));
        Assert.assertEquals(0L, r0.counter);
    }

    @Test
    public void testReleaseWhilePollingChannel() throws Exception {
        final AtomicReference atomicReference = new AtomicReference();
        final SingleInputGate createInputGate = createInputGate(1);
        createInputGate.setInputChannels(new InputChannel[]{InputChannelBuilder.newBuilder().buildUnknownChannel(createInputGate)});
        Thread thread = new Thread() { // from class: org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateTest.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    createInputGate.getNext();
                } catch (Exception e) {
                    atomicReference.set(e);
                }
            }
        };
        thread.start();
        boolean z = false;
        for (int i = 0; i < 50; i++) {
            if (thread.isAlive()) {
                z = thread.getState() == Thread.State.WAITING;
            }
            if (z) {
                break;
            }
            Thread.sleep(100L);
        }
        Assert.assertTrue("Did not trigger blocking buffer request.", z);
        createInputGate.close();
        thread.join();
        Assert.assertNotNull(atomicReference.get());
        Assert.assertEquals(IllegalStateException.class, ((Exception) atomicReference.get()).getClass());
    }

    @Test
    public void testRequestBackoffConfiguration() throws Exception {
        IntermediateResultPartitionID[] intermediateResultPartitionIDArr = {new IntermediateResultPartitionID(), new IntermediateResultPartitionID(), new IntermediateResultPartitionID()};
        NettyShuffleEnvironment build = new NettyShuffleEnvironmentBuilder().setPartitionRequestInitialBackoff(137).setPartitionRequestMaxBackoff(1001).build();
        SingleInputGate createSingleInputGate = createSingleInputGate(intermediateResultPartitionIDArr, ResultPartitionType.PIPELINED, build);
        createSingleInputGate.setChannelStateWriter(ChannelStateWriter.NO_OP);
        createSingleInputGate.finishReadRecoveredState();
        while (!createSingleInputGate.getStateConsumedFuture().isDone()) {
            createSingleInputGate.pollNext();
        }
        createSingleInputGate.convertRecoveredInputChannels();
        Closer create = Closer.create();
        Throwable th = null;
        try {
            try {
                build.getClass();
                create.register(build::close);
                createSingleInputGate.getClass();
                create.register(createSingleInputGate::close);
                Assert.assertEquals(ResultPartitionType.PIPELINED, createSingleInputGate.getConsumedPartitionType());
                Map inputChannels = createSingleInputGate.getInputChannels();
                Assert.assertEquals(3L, inputChannels.size());
                inputChannels.values().forEach(inputChannel -> {
                    try {
                        inputChannel.checkError();
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                });
                InputChannel inputChannel2 = (InputChannel) inputChannels.get(createSubpartitionInfo(intermediateResultPartitionIDArr[0]));
                Assert.assertEquals(LocalInputChannel.class, inputChannel2.getClass());
                InputChannel inputChannel3 = (InputChannel) inputChannels.get(createSubpartitionInfo(intermediateResultPartitionIDArr[1]));
                Assert.assertEquals(RemoteInputChannel.class, inputChannel3.getClass());
                InputChannel inputChannel4 = (InputChannel) inputChannels.get(createSubpartitionInfo(intermediateResultPartitionIDArr[2]));
                Assert.assertEquals(UnknownInputChannel.class, inputChannel4.getClass());
                for (InputChannel inputChannel5 : new InputChannel[]{inputChannel2, inputChannel3, inputChannel4}) {
                    Assert.assertEquals(0L, inputChannel5.getCurrentBackoff());
                    Assert.assertTrue(inputChannel5.increaseBackoff());
                    Assert.assertEquals(137, inputChannel5.getCurrentBackoff());
                    Assert.assertTrue(inputChannel5.increaseBackoff());
                    Assert.assertEquals(137 * 2, inputChannel5.getCurrentBackoff());
                    Assert.assertTrue(inputChannel5.increaseBackoff());
                    Assert.assertEquals(137 * 2 * 2, inputChannel5.getCurrentBackoff());
                    Assert.assertTrue(inputChannel5.increaseBackoff());
                    Assert.assertEquals(1001, inputChannel5.getCurrentBackoff());
                    Assert.assertFalse(inputChannel5.increaseBackoff());
                }
                if (create != null) {
                    if (0 == 0) {
                        create.close();
                        return;
                    }
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    create.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testRequestBuffersWithRemoteInputChannel() throws Exception {
        NettyShuffleEnvironment createNettyShuffleEnvironment = createNettyShuffleEnvironment();
        SingleInputGate createInputGate = createInputGate(createNettyShuffleEnvironment, 1, ResultPartitionType.PIPELINED_BOUNDED);
        Closer create = Closer.create();
        Throwable th = null;
        try {
            try {
                createNettyShuffleEnvironment.getClass();
                create.register(createNettyShuffleEnvironment::close);
                createInputGate.getClass();
                create.register(createInputGate::close);
                createInputGate.setInputChannels(new InputChannel[]{InputChannelBuilder.newBuilder().setupFromNettyShuffleEnvironment(createNettyShuffleEnvironment).setConnectionManager(new TestingConnectionManager()).buildRemoteChannel(createInputGate)});
                createInputGate.setup();
                NetworkBufferPool networkBufferPool = createNettyShuffleEnvironment.getNetworkBufferPool();
                Assert.assertEquals(2, r0.getNumberOfAvailableBuffers());
                Assert.assertEquals((networkBufferPool.getTotalNumberOfMemorySegments() - 2) - 1, networkBufferPool.getNumberOfAvailableMemorySegments());
                Assert.assertEquals(8, networkBufferPool.countBuffers());
                if (create != null) {
                    if (0 == 0) {
                        create.close();
                        return;
                    }
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    create.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testRequestBuffersWithUnknownInputChannel() throws Exception {
        NettyShuffleEnvironment createNettyShuffleEnvironment = createNettyShuffleEnvironment();
        SingleInputGate createInputGate = createInputGate(createNettyShuffleEnvironment, 1, ResultPartitionType.PIPELINED_BOUNDED);
        Closer create = Closer.create();
        Throwable th = null;
        try {
            try {
                createNettyShuffleEnvironment.getClass();
                create.register(createNettyShuffleEnvironment::close);
                createInputGate.getClass();
                create.register(createInputGate::close);
                ResultPartitionID resultPartitionID = new ResultPartitionID();
                createInputGate.setInputChannels(new InputChannel[]{buildUnknownInputChannel(createNettyShuffleEnvironment, createInputGate, resultPartitionID, 0)});
                createInputGate.setup();
                NetworkBufferPool networkBufferPool = createNettyShuffleEnvironment.getNetworkBufferPool();
                Assert.assertEquals(networkBufferPool.getTotalNumberOfMemorySegments() - 1, networkBufferPool.getNumberOfAvailableMemorySegments());
                Assert.assertEquals(8, networkBufferPool.countBuffers());
                createInputGate.updateInputChannel(ResourceID.generate(), NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation(resultPartitionID.getPartitionId(), ResourceID.generate()));
                Assert.assertEquals(2, ((RemoteInputChannel) createInputGate.getInputChannels().get(createSubpartitionInfo(resultPartitionID.getPartitionId()))).getNumberOfAvailableBuffers());
                Assert.assertEquals((networkBufferPool.getTotalNumberOfMemorySegments() - 2) - 1, networkBufferPool.getNumberOfAvailableMemorySegments());
                Assert.assertEquals(8, networkBufferPool.countBuffers());
                if (create != null) {
                    if (0 == 0) {
                        create.close();
                        return;
                    }
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    create.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testUpdateUnknownInputChannel() throws Exception {
        NettyShuffleEnvironment createNettyShuffleEnvironment = createNettyShuffleEnvironment();
        ResultPartition build = new ResultPartitionBuilder().setResultPartitionManager(createNettyShuffleEnvironment.getResultPartitionManager()).setupBufferPoolFactoryFromNettyShuffleEnvironment(createNettyShuffleEnvironment).build();
        ResultPartition build2 = new ResultPartitionBuilder().setResultPartitionManager(createNettyShuffleEnvironment.getResultPartitionManager()).setupBufferPoolFactoryFromNettyShuffleEnvironment(createNettyShuffleEnvironment).build();
        build.setup();
        build2.setup();
        SingleInputGate createInputGate = createInputGate(createNettyShuffleEnvironment, 2, ResultPartitionType.PIPELINED);
        InputChannel[] inputChannelArr = new InputChannel[2];
        Closer create = Closer.create();
        Throwable th = null;
        try {
            try {
                createNettyShuffleEnvironment.getClass();
                create.register(createNettyShuffleEnvironment::close);
                createInputGate.getClass();
                create.register(createInputGate::close);
                ResultPartitionID partitionId = build.getPartitionId();
                inputChannelArr[0] = buildUnknownInputChannel(createNettyShuffleEnvironment, createInputGate, partitionId, 0);
                ResultPartitionID partitionId2 = build2.getPartitionId();
                inputChannelArr[1] = buildUnknownInputChannel(createNettyShuffleEnvironment, createInputGate, partitionId2, 1);
                createInputGate.setInputChannels(inputChannelArr);
                createInputGate.setup();
                MatcherAssert.assertThat(createInputGate.getInputChannels().get(createSubpartitionInfo(partitionId2.getPartitionId())), Matchers.is(Matchers.instanceOf(UnknownInputChannel.class)));
                MatcherAssert.assertThat(createInputGate.getInputChannels().get(createSubpartitionInfo(partitionId.getPartitionId())), Matchers.is(Matchers.instanceOf(UnknownInputChannel.class)));
                ResourceID generate = ResourceID.generate();
                createInputGate.updateInputChannel(generate, NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation(partitionId2.getPartitionId(), ResourceID.generate()));
                MatcherAssert.assertThat(createInputGate.getInputChannels().get(createSubpartitionInfo(partitionId2.getPartitionId())), Matchers.is(Matchers.instanceOf(RemoteInputChannel.class)));
                MatcherAssert.assertThat(createInputGate.getInputChannels().get(createSubpartitionInfo(partitionId.getPartitionId())), Matchers.is(Matchers.instanceOf(UnknownInputChannel.class)));
                createInputGate.updateInputChannel(generate, NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation(partitionId.getPartitionId(), generate));
                MatcherAssert.assertThat(createInputGate.getInputChannels().get(createSubpartitionInfo(partitionId2.getPartitionId())), Matchers.is(Matchers.instanceOf(RemoteInputChannel.class)));
                MatcherAssert.assertThat(createInputGate.getInputChannels().get(createSubpartitionInfo(partitionId.getPartitionId())), Matchers.is(Matchers.instanceOf(LocalInputChannel.class)));
                if (create != null) {
                    if (0 == 0) {
                        create.close();
                        return;
                    }
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    create.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testSingleInputGateWithSubpartitionIndexRange() throws IOException, InterruptedException {
        IntermediateResultPartitionID[] intermediateResultPartitionIDArr = {new IntermediateResultPartitionID(), new IntermediateResultPartitionID(), new IntermediateResultPartitionID()};
        SubpartitionIndexRange subpartitionIndexRange = new SubpartitionIndexRange(0, 1);
        NettyShuffleEnvironment build = new NettyShuffleEnvironmentBuilder().build();
        ResourceID generate = ResourceID.generate();
        SingleInputGate createSingleInputGate = createSingleInputGate(intermediateResultPartitionIDArr, ResultPartitionType.BLOCKING, subpartitionIndexRange, build, generate, new TestingConnectionManager(), new TestingResultPartitionManager(new NoOpResultSubpartitionView()));
        for (ChannelStateHolder channelStateHolder : createSingleInputGate.getInputChannels().values()) {
            if (channelStateHolder instanceof ChannelStateHolder) {
                channelStateHolder.setChannelStateWriter(ChannelStateWriter.NO_OP);
            }
        }
        SingleInputGate.SubpartitionInfo createSubpartitionInfo = createSubpartitionInfo(intermediateResultPartitionIDArr[0], 0);
        SingleInputGate.SubpartitionInfo createSubpartitionInfo2 = createSubpartitionInfo(intermediateResultPartitionIDArr[0], 1);
        SingleInputGate.SubpartitionInfo createSubpartitionInfo3 = createSubpartitionInfo(intermediateResultPartitionIDArr[1], 0);
        SingleInputGate.SubpartitionInfo createSubpartitionInfo4 = createSubpartitionInfo(intermediateResultPartitionIDArr[1], 1);
        SingleInputGate.SubpartitionInfo createSubpartitionInfo5 = createSubpartitionInfo(intermediateResultPartitionIDArr[2], 0);
        SingleInputGate.SubpartitionInfo createSubpartitionInfo6 = createSubpartitionInfo(intermediateResultPartitionIDArr[2], 1);
        MatcherAssert.assertThat(Integer.valueOf(createSingleInputGate.getInputChannels().size()), Matchers.is(6));
        MatcherAssert.assertThat(Integer.valueOf(((InputChannel) createSingleInputGate.getInputChannels().get(createSubpartitionInfo)).getConsumedSubpartitionIndex()), Matchers.is(0));
        MatcherAssert.assertThat(Integer.valueOf(((InputChannel) createSingleInputGate.getInputChannels().get(createSubpartitionInfo2)).getConsumedSubpartitionIndex()), Matchers.is(1));
        MatcherAssert.assertThat(Integer.valueOf(((InputChannel) createSingleInputGate.getInputChannels().get(createSubpartitionInfo3)).getConsumedSubpartitionIndex()), Matchers.is(0));
        MatcherAssert.assertThat(Integer.valueOf(((InputChannel) createSingleInputGate.getInputChannels().get(createSubpartitionInfo4)).getConsumedSubpartitionIndex()), Matchers.is(1));
        MatcherAssert.assertThat(Integer.valueOf(((InputChannel) createSingleInputGate.getInputChannels().get(createSubpartitionInfo5)).getConsumedSubpartitionIndex()), Matchers.is(0));
        MatcherAssert.assertThat(Integer.valueOf(((InputChannel) createSingleInputGate.getInputChannels().get(createSubpartitionInfo6)).getConsumedSubpartitionIndex()), Matchers.is(1));
        assertChannelsType(createSingleInputGate, LocalRecoveredInputChannel.class, Arrays.asList(createSubpartitionInfo, createSubpartitionInfo2));
        assertChannelsType(createSingleInputGate, RemoteRecoveredInputChannel.class, Arrays.asList(createSubpartitionInfo3, createSubpartitionInfo4));
        assertChannelsType(createSingleInputGate, UnknownInputChannel.class, Arrays.asList(createSubpartitionInfo5, createSubpartitionInfo6));
        createSingleInputGate.setup();
        Assert.assertNotNull(createSingleInputGate.getBufferPool());
        Assert.assertEquals(1L, createSingleInputGate.getBufferPool().getNumberOfRequiredMemorySegments());
        createSingleInputGate.finishReadRecoveredState();
        while (!createSingleInputGate.getStateConsumedFuture().isDone()) {
            createSingleInputGate.pollNext();
        }
        createSingleInputGate.requestPartitions();
        createSingleInputGate.pollNext();
        assertChannelsType(createSingleInputGate, LocalInputChannel.class, Arrays.asList(createSubpartitionInfo, createSubpartitionInfo2));
        assertChannelsType(createSingleInputGate, RemoteInputChannel.class, Arrays.asList(createSubpartitionInfo3, createSubpartitionInfo4));
        assertChannelsType(createSingleInputGate, UnknownInputChannel.class, Arrays.asList(createSubpartitionInfo5, createSubpartitionInfo6));
        for (LocalInputChannel localInputChannel : createSingleInputGate.getInputChannels().values()) {
            if (localInputChannel instanceof RemoteInputChannel) {
                Assert.assertNotNull(((RemoteInputChannel) localInputChannel).getPartitionRequestClient());
                Assert.assertEquals(2L, ((RemoteInputChannel) localInputChannel).getInitialCredit());
            } else if (localInputChannel instanceof LocalInputChannel) {
                Assert.assertNotNull(localInputChannel.getSubpartitionView());
            }
        }
        createSingleInputGate.updateInputChannel(generate, NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation(intermediateResultPartitionIDArr[2], generate));
        assertChannelsType(createSingleInputGate, LocalInputChannel.class, Arrays.asList(createSubpartitionInfo, createSubpartitionInfo2));
        assertChannelsType(createSingleInputGate, RemoteInputChannel.class, Arrays.asList(createSubpartitionInfo3, createSubpartitionInfo4));
        assertChannelsType(createSingleInputGate, LocalInputChannel.class, Arrays.asList(createSubpartitionInfo5, createSubpartitionInfo6));
    }

    private void assertChannelsType(SingleInputGate singleInputGate, Class<?> cls, List<SingleInputGate.SubpartitionInfo> list) {
        Iterator<SingleInputGate.SubpartitionInfo> it = list.iterator();
        while (it.hasNext()) {
            MatcherAssert.assertThat(singleInputGate.getInputChannels().get(it.next()), Matchers.instanceOf(cls));
        }
    }

    @Test
    public void testQueuedBuffers() throws Exception {
        NettyShuffleEnvironment createNettyShuffleEnvironment = createNettyShuffleEnvironment();
        BufferWritingResultPartition build = new ResultPartitionBuilder().setResultPartitionManager(createNettyShuffleEnvironment.getResultPartitionManager()).setupBufferPoolFactoryFromNettyShuffleEnvironment(createNettyShuffleEnvironment).build();
        SingleInputGate createInputGate = createInputGate(createNettyShuffleEnvironment, 2, ResultPartitionType.PIPELINED);
        ResultPartitionID partitionId = build.getPartitionId();
        RemoteInputChannel buildRemoteChannel = InputChannelBuilder.newBuilder().setChannelIndex(1).setupFromNettyShuffleEnvironment(createNettyShuffleEnvironment).setConnectionManager(new TestingConnectionManager()).buildRemoteChannel(createInputGate);
        InputChannel[] inputChannelArr = {buildRemoteChannel, InputChannelBuilder.newBuilder().setChannelIndex(0).setPartitionId(partitionId).setupFromNettyShuffleEnvironment(createNettyShuffleEnvironment).setConnectionManager(new TestingConnectionManager()).buildLocalChannel(createInputGate)};
        Closer create = Closer.create();
        Throwable th = null;
        try {
            try {
                createNettyShuffleEnvironment.getClass();
                create.register(createNettyShuffleEnvironment::close);
                createInputGate.getClass();
                create.register(createInputGate::close);
                build.getClass();
                create.register(build::release);
                build.setup();
                InputGateFairnessTest.setupInputGate(createInputGate, inputChannelArr);
                buildRemoteChannel.onBuffer(TestBufferFactory.createBuffer(1), 0, 0);
                Assert.assertEquals(1L, createInputGate.getNumberOfQueuedBuffers());
                build.emitRecord(ByteBuffer.allocate(1), 0);
                Assert.assertEquals(2L, createInputGate.getNumberOfQueuedBuffers());
                if (create != null) {
                    if (0 == 0) {
                        create.close();
                        return;
                    }
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    create.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testPartitionNotFoundExceptionWhileGetNextBuffer() throws Exception {
        SingleInputGate createSingleInputGate = InputChannelTestUtils.createSingleInputGate(1);
        InputChannel createLocalInputChannel = InputChannelTestUtils.createLocalInputChannel(createSingleInputGate, new ResultPartitionManager());
        ResultPartitionID partitionId = createLocalInputChannel.getPartitionId();
        createSingleInputGate.setInputChannels(new InputChannel[]{createLocalInputChannel});
        createLocalInputChannel.setError(new PartitionNotFoundException(partitionId));
        try {
            createSingleInputGate.getNext();
            Assert.fail("Should throw a PartitionNotFoundException.");
        } catch (PartitionNotFoundException e) {
            MatcherAssert.assertThat(partitionId, Matchers.is(e.getPartitionId()));
        }
    }

    @Test
    public void testAnnounceBufferSize() throws Exception {
        SingleInputGate createSingleInputGate = InputChannelTestUtils.createSingleInputGate(2);
        InputChannel createLocalInputChannel = InputChannelTestUtils.createLocalInputChannel(createSingleInputGate, new TestingResultPartitionManager(InputChannelTestUtils.createResultSubpartitionView(new BufferConsumer[0])));
        InputChannel createRemoteInputChannel = InputChannelTestUtils.createRemoteInputChannel(createSingleInputGate, 1);
        createSingleInputGate.setInputChannels(new InputChannel[]{createLocalInputChannel, createRemoteInputChannel});
        createSingleInputGate.requestPartitions();
        createSingleInputGate.announceBufferSize(10);
        createLocalInputChannel.releaseAllResources();
        createSingleInputGate.announceBufferSize(11);
        createRemoteInputChannel.releaseAllResources();
        createSingleInputGate.announceBufferSize(12);
        createSingleInputGate.close();
        createSingleInputGate.announceBufferSize(13);
    }

    @Test
    public void testInputGateRemovalFromNettyShuffleEnvironment() throws Exception {
        NettyShuffleEnvironment createNettyShuffleEnvironment = createNettyShuffleEnvironment();
        Closer create = Closer.create();
        Throwable th = null;
        try {
            createNettyShuffleEnvironment.getClass();
            create.register(createNettyShuffleEnvironment::close);
            Map<InputGateID, SingleInputGate> createInputGateWithLocalChannels = createInputGateWithLocalChannels(createNettyShuffleEnvironment, 10, 1);
            Assert.assertEquals(10, createInputGateWithLocalChannels.size());
            for (InputGateID inputGateID : createInputGateWithLocalChannels.keySet()) {
                MatcherAssert.assertThat(Boolean.valueOf(createNettyShuffleEnvironment.getInputGate(inputGateID).isPresent()), Matchers.is(true));
                createInputGateWithLocalChannels.get(inputGateID).close();
                MatcherAssert.assertThat(Boolean.valueOf(createNettyShuffleEnvironment.getInputGate(inputGateID).isPresent()), Matchers.is(false));
            }
            if (create != null) {
                if (0 == 0) {
                    create.close();
                    return;
                }
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (create != null) {
                if (0 != 0) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    create.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testSingleInputGateInfo() {
        for (int i = 0; i < 2; i++) {
            int i2 = 0;
            Iterator it = new SingleInputGateBuilder().setSingleInputGateIndex(i).setNumberOfChannels(3).build().getInputChannels().values().iterator();
            while (it.hasNext()) {
                InputChannelInfo channelInfo = ((InputChannel) it.next()).getChannelInfo();
                Assert.assertEquals(i, channelInfo.getGateIdx());
                int i3 = i2;
                i2++;
                Assert.assertEquals(i3, channelInfo.getInputChannelIdx());
            }
        }
    }

    @Test
    public void testGetUnfinishedChannels() throws IOException, InterruptedException {
        SingleInputGate build = new SingleInputGateBuilder().setSingleInputGateIndex(1).setNumberOfChannels(3).build();
        TestInputChannel[] testInputChannelArr = {new TestInputChannel(build, 0), new TestInputChannel(build, 1), new TestInputChannel(build, 2)};
        build.setInputChannels(testInputChannelArr);
        Assert.assertEquals(Arrays.asList(testInputChannelArr[0].getChannelInfo(), testInputChannelArr[1].getChannelInfo(), testInputChannelArr[2].getChannelInfo()), build.getUnfinishedChannels());
        testInputChannelArr[1].readEndOfPartitionEvent();
        build.notifyChannelNonEmpty(testInputChannelArr[1]);
        build.getNext();
        Assert.assertEquals(Arrays.asList(testInputChannelArr[0].getChannelInfo(), testInputChannelArr[2].getChannelInfo()), build.getUnfinishedChannels());
        testInputChannelArr[0].readEndOfPartitionEvent();
        build.notifyChannelNonEmpty(testInputChannelArr[0]);
        build.getNext();
        Assert.assertEquals(Collections.singletonList(testInputChannelArr[2].getChannelInfo()), build.getUnfinishedChannels());
        testInputChannelArr[2].readEndOfPartitionEvent();
        build.notifyChannelNonEmpty(testInputChannelArr[2]);
        build.getNext();
        Assert.assertEquals(Collections.emptyList(), build.getUnfinishedChannels());
    }

    @Test
    public void testBufferInUseCount() throws Exception {
        SingleInputGate createInputGate = createInputGate();
        TestInputChannel[] testInputChannelArr = {new TestInputChannel(createInputGate, 0), new TestInputChannel(createInputGate, 1)};
        createInputGate.setInputChannels(testInputChannelArr);
        MatcherAssert.assertThat(Integer.valueOf(createInputGate.getBuffersInUseCount()), Matchers.is(0));
        testInputChannelArr[0].readBuffer();
        MatcherAssert.assertThat(Integer.valueOf(createInputGate.getBuffersInUseCount()), Matchers.is(1));
        testInputChannelArr[0].readBuffer();
        MatcherAssert.assertThat(Integer.valueOf(createInputGate.getBuffersInUseCount()), Matchers.is(2));
        testInputChannelArr[1].readBuffer();
        MatcherAssert.assertThat(Integer.valueOf(createInputGate.getBuffersInUseCount()), Matchers.is(3));
    }

    private static SingleInputGate.SubpartitionInfo createSubpartitionInfo(IntermediateResultPartitionID intermediateResultPartitionID) {
        return createSubpartitionInfo(intermediateResultPartitionID, 0);
    }

    private static SingleInputGate.SubpartitionInfo createSubpartitionInfo(IntermediateResultPartitionID intermediateResultPartitionID, int i) {
        return new SingleInputGate.SubpartitionInfo(intermediateResultPartitionID, i);
    }

    static SingleInputGate createSingleInputGate(IntermediateResultPartitionID[] intermediateResultPartitionIDArr, ResultPartitionType resultPartitionType, NettyShuffleEnvironment nettyShuffleEnvironment) throws IOException {
        return createSingleInputGate(intermediateResultPartitionIDArr, resultPartitionType, new SubpartitionIndexRange(0, 0), nettyShuffleEnvironment, ResourceID.generate(), null, null);
    }

    static SingleInputGate createSingleInputGate(IntermediateResultPartitionID[] intermediateResultPartitionIDArr, ResultPartitionType resultPartitionType, SubpartitionIndexRange subpartitionIndexRange, NettyShuffleEnvironment nettyShuffleEnvironment, ResourceID resourceID, ConnectionManager connectionManager, ResultPartitionManager resultPartitionManager) throws IOException {
        InputGateDeploymentDescriptor inputGateDeploymentDescriptor = new InputGateDeploymentDescriptor(new IntermediateDataSetID(), resultPartitionType, subpartitionIndexRange, new TaskDeploymentDescriptor.NonOffloaded(CompressedSerializedValue.fromObject(new ShuffleDescriptor[]{NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation(intermediateResultPartitionIDArr[0], resourceID), NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation(intermediateResultPartitionIDArr[1], ResourceID.generate()), new UnknownShuffleDescriptor(new ResultPartitionID(intermediateResultPartitionIDArr[2], ExecutionGraphTestUtils.createExecutionAttemptId()))})));
        TaskMetricGroup createUnregisteredTaskMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
        return new SingleInputGateFactory(resourceID, nettyShuffleEnvironment.getConfiguration(), connectionManager != null ? connectionManager : nettyShuffleEnvironment.getConnectionManager(), resultPartitionManager != null ? resultPartitionManager : nettyShuffleEnvironment.getResultPartitionManager(), new TaskEventDispatcher(), nettyShuffleEnvironment.getNetworkBufferPool()).create(nettyShuffleEnvironment.createShuffleIOOwnerContext("TestTask", createUnregisteredTaskMetricGroup.executionId(), createUnregisteredTaskMetricGroup), 0, inputGateDeploymentDescriptor, SingleInputGateBuilder.NO_OP_PRODUCER_CHECKER);
    }

    private static Map<InputGateID, SingleInputGate> createInputGateWithLocalChannels(NettyShuffleEnvironment nettyShuffleEnvironment, int i, int i2) throws IOException {
        NettyShuffleDescriptor[] nettyShuffleDescriptorArr = new NettyShuffleDescriptor[i2];
        for (int i3 = 0; i3 < i2; i3++) {
            nettyShuffleDescriptorArr[i3] = NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation(new IntermediateResultPartitionID(), ResourceID.generate());
        }
        InputGateDeploymentDescriptor[] inputGateDeploymentDescriptorArr = new InputGateDeploymentDescriptor[i];
        IntermediateDataSetID[] intermediateDataSetIDArr = new IntermediateDataSetID[i];
        for (int i4 = 0; i4 < i; i4++) {
            intermediateDataSetIDArr[i4] = new IntermediateDataSetID();
            inputGateDeploymentDescriptorArr[i4] = new InputGateDeploymentDescriptor(intermediateDataSetIDArr[i4], ResultPartitionType.PIPELINED, 0, nettyShuffleDescriptorArr);
        }
        ExecutionAttemptID createExecutionAttemptId = ExecutionGraphTestUtils.createExecutionAttemptId();
        SingleInputGate[] singleInputGateArr = (SingleInputGate[]) nettyShuffleEnvironment.createInputGates(nettyShuffleEnvironment.createShuffleIOOwnerContext("", createExecutionAttemptId, new UnregisteredMetricsGroup()), SingleInputGateBuilder.NO_OP_PRODUCER_CHECKER, Arrays.asList(inputGateDeploymentDescriptorArr)).toArray(new SingleInputGate[0]);
        HashMap hashMap = new HashMap();
        for (int i5 = 0; i5 < i; i5++) {
            hashMap.put(new InputGateID(intermediateDataSetIDArr[i5], createExecutionAttemptId), singleInputGateArr[i5]);
        }
        return hashMap;
    }

    private InputChannel buildUnknownInputChannel(NettyShuffleEnvironment nettyShuffleEnvironment, SingleInputGate singleInputGate, ResultPartitionID resultPartitionID, int i) {
        return InputChannelBuilder.newBuilder().setChannelIndex(i).setPartitionId(resultPartitionID).setupFromNettyShuffleEnvironment(nettyShuffleEnvironment).setConnectionManager(new TestingConnectionManager()).buildUnknownChannel(singleInputGate);
    }

    private NettyShuffleEnvironment createNettyShuffleEnvironment() {
        return new NettyShuffleEnvironmentBuilder().build();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void verifyBufferOrEvent(InputGate inputGate, boolean z, int i, boolean z2) throws IOException, InterruptedException {
        Optional next = inputGate.getNext();
        Assert.assertTrue(next.isPresent());
        Assert.assertEquals(Boolean.valueOf(z), Boolean.valueOf(((BufferOrEvent) next.get()).isBuffer()));
        Assert.assertEquals(inputGate.getChannel(i).getChannelInfo(), ((BufferOrEvent) next.get()).getChannelInfo());
        Assert.assertEquals(Boolean.valueOf(z2), Boolean.valueOf(((BufferOrEvent) next.get()).moreAvailable()));
        if (z2) {
            return;
        }
        Assert.assertFalse(inputGate.pollNext().isPresent());
    }

    private SingleInputGate createInputGate(NettyShuffleEnvironment nettyShuffleEnvironment) {
        SingleInputGate createInputGate = createInputGate(nettyShuffleEnvironment, 3, ResultPartitionType.PIPELINED);
        createInputGate.setInputChannels(new InputChannel[]{new InputChannelBuilder().setChannelIndex(0).buildRemoteRecoveredChannel(createInputGate), new InputChannelBuilder().setChannelIndex(1).buildLocalRecoveredChannel(createInputGate), new InputChannelBuilder().setChannelIndex(2).buildUnknownChannel(createInputGate)});
        return createInputGate;
    }
}
