package org.apache.flink.runtime.io.network.netty;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.TaskEventPublisher;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.netty.NettyMessage;
import org.apache.flink.runtime.io.network.netty.NettyTestUtil;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.util.Preconditions;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.class */
public class CancelPartitionRequestTest {

    /* loaded from: input_file:org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest$InfiniteSubpartitionView.class */
    static class InfiniteSubpartitionView implements ResultSubpartitionView {
        private final BufferProvider bufferProvider;
        private final CountDownLatch sync;

        public InfiniteSubpartitionView(BufferProvider bufferProvider, CountDownLatch countDownLatch) {
            this.bufferProvider = (BufferProvider) Preconditions.checkNotNull(bufferProvider);
            this.sync = (CountDownLatch) Preconditions.checkNotNull(countDownLatch);
        }

        @Nullable
        public ResultSubpartition.BufferAndBacklog getNextBuffer() throws IOException {
            Buffer requestBuffer = this.bufferProvider.requestBuffer();
            if (requestBuffer == null) {
                return null;
            }
            requestBuffer.setSize(requestBuffer.getMaxCapacity());
            return new ResultSubpartition.BufferAndBacklog(requestBuffer, true, 0, false);
        }

        public void notifyDataAvailable() {
        }

        public void releaseAllResources() throws IOException {
            this.sync.countDown();
        }

        public boolean isReleased() {
            return false;
        }

        public void resumeConsumption() {
        }

        public boolean isAvailable(int i) {
            return true;
        }

        public int unsynchronizedGetNumberOfQueuedBuffers() {
            return 0;
        }

        public Throwable getFailureCause() {
            return null;
        }
    }

    @Test
    public void testCancelPartitionRequest() throws Exception {
        NettyTestUtil.NettyServerAndClient nettyServerAndClient = null;
        try {
            TestPooledBufferProvider testPooledBufferProvider = new TestPooledBufferProvider(16);
            ResultPartitionManager resultPartitionManager = (ResultPartitionManager) Mockito.mock(ResultPartitionManager.class);
            ResultPartitionID resultPartitionID = new ResultPartitionID();
            CountDownLatch countDownLatch = new CountDownLatch(1);
            final ResultSubpartitionView resultSubpartitionView = (ResultSubpartitionView) Mockito.spy(new InfiniteSubpartitionView(testPooledBufferProvider, countDownLatch));
            Mockito.when(resultPartitionManager.createSubpartitionView((ResultPartitionID) Matchers.eq(resultPartitionID), Matchers.eq(0), (BufferAvailabilityListener) Matchers.any(BufferAvailabilityListener.class))).thenAnswer(new Answer<ResultSubpartitionView>() { // from class: org.apache.flink.runtime.io.network.netty.CancelPartitionRequestTest.1
                /* renamed from: answer, reason: merged with bridge method [inline-methods] */
                public ResultSubpartitionView m118answer(InvocationOnMock invocationOnMock) throws Throwable {
                    ((BufferAvailabilityListener) invocationOnMock.getArguments()[2]).notifyDataAvailable();
                    return resultSubpartitionView;
                }
            });
            nettyServerAndClient = NettyTestUtil.initServerAndClient(new NettyProtocol(resultPartitionManager, (TaskEventPublisher) Mockito.mock(TaskEventDispatcher.class)));
            NettyTestUtil.connect(nettyServerAndClient).writeAndFlush(new NettyMessage.PartitionRequest(resultPartitionID, 0, new InputChannelID(), Integer.MAX_VALUE)).await();
            if (!countDownLatch.await(TestingUtils.TESTING_DURATION().toMillis(), TimeUnit.MILLISECONDS)) {
                Assert.fail("Timed out after waiting for " + TestingUtils.TESTING_DURATION().toMillis() + " ms to be notified about cancelled partition.");
            }
            ((ResultSubpartitionView) Mockito.verify(resultSubpartitionView, Mockito.times(1))).releaseAllResources();
            NettyTestUtil.shutdown(nettyServerAndClient);
        } catch (Throwable th) {
            NettyTestUtil.shutdown(nettyServerAndClient);
            throw th;
        }
    }

    @Test
    public void testDuplicateCancel() throws Exception {
        NettyTestUtil.NettyServerAndClient nettyServerAndClient = null;
        try {
            TestPooledBufferProvider testPooledBufferProvider = new TestPooledBufferProvider(16);
            ResultPartitionManager resultPartitionManager = (ResultPartitionManager) Mockito.mock(ResultPartitionManager.class);
            ResultPartitionID resultPartitionID = new ResultPartitionID();
            CountDownLatch countDownLatch = new CountDownLatch(1);
            final ResultSubpartitionView resultSubpartitionView = (ResultSubpartitionView) Mockito.spy(new InfiniteSubpartitionView(testPooledBufferProvider, countDownLatch));
            Mockito.when(resultPartitionManager.createSubpartitionView((ResultPartitionID) Matchers.eq(resultPartitionID), Matchers.eq(0), (BufferAvailabilityListener) Matchers.any(BufferAvailabilityListener.class))).thenAnswer(new Answer<ResultSubpartitionView>() { // from class: org.apache.flink.runtime.io.network.netty.CancelPartitionRequestTest.2
                /* renamed from: answer, reason: merged with bridge method [inline-methods] */
                public ResultSubpartitionView m119answer(InvocationOnMock invocationOnMock) throws Throwable {
                    ((BufferAvailabilityListener) invocationOnMock.getArguments()[2]).notifyDataAvailable();
                    return resultSubpartitionView;
                }
            });
            nettyServerAndClient = NettyTestUtil.initServerAndClient(new NettyProtocol(resultPartitionManager, (TaskEventPublisher) Mockito.mock(TaskEventDispatcher.class)));
            Channel connect = NettyTestUtil.connect(nettyServerAndClient);
            InputChannelID inputChannelID = new InputChannelID();
            connect.writeAndFlush(new NettyMessage.PartitionRequest(resultPartitionID, 0, inputChannelID, Integer.MAX_VALUE)).await();
            if (!countDownLatch.await(TestingUtils.TESTING_DURATION().toMillis(), TimeUnit.MILLISECONDS)) {
                Assert.fail("Timed out after waiting for " + TestingUtils.TESTING_DURATION().toMillis() + " ms to be notified about cancelled partition.");
            }
            connect.writeAndFlush(new NettyMessage.CancelPartitionRequest(inputChannelID)).await();
            connect.close();
            NettyTestUtil.awaitClose(connect);
            ((ResultSubpartitionView) Mockito.verify(resultSubpartitionView, Mockito.times(1))).releaseAllResources();
            NettyTestUtil.shutdown(nettyServerAndClient);
        } catch (Throwable th) {
            NettyTestUtil.shutdown(nettyServerAndClient);
            throw th;
        }
    }
}
