package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.util.Collections;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.io.disk.FileChannelManager;
import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.taskmanager.ConsumableNotifyingResultPartitionWriterDecorator;
import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
import org.apache.flink.runtime.taskmanager.TaskActions;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.hamcrest.MatcherAssert;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/ResultPartitionTest.class */
public class ResultPartitionTest {
    private static final String tempDir = EnvironmentInformation.getTemporaryFileDirectory();
    private static FileChannelManager fileChannelManager;

    @BeforeClass
    public static void setUp() {
        fileChannelManager = new FileChannelManagerImpl(new String[]{tempDir}, "testing");
    }

    @AfterClass
    public static void shutdown() throws Exception {
        fileChannelManager.close();
    }

    @Test
    public void testSendScheduleOrUpdateConsumersMessage() throws Exception {
        JobID jobID = new JobID();
        NoOpTaskActions noOpTaskActions = new NoOpTaskActions();
        ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = (ResultPartitionConsumableNotifier) Mockito.mock(ResultPartitionConsumableNotifier.class);
        ResultPartitionWriter createConsumableNotifyingResultPartitionWriter = createConsumableNotifyingResultPartitionWriter(ResultPartitionType.PIPELINED, noOpTaskActions, jobID, resultPartitionConsumableNotifier);
        createConsumableNotifyingResultPartitionWriter.addBufferConsumer(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(32768), 0);
        ((ResultPartitionConsumableNotifier) Mockito.verify(resultPartitionConsumableNotifier, Mockito.times(1))).notifyPartitionConsumable((JobID) Matchers.eq(jobID), (ResultPartitionID) Matchers.eq(createConsumableNotifyingResultPartitionWriter.getPartitionId()), (TaskActions) Matchers.eq(noOpTaskActions));
        ResultPartitionConsumableNotifier resultPartitionConsumableNotifier2 = (ResultPartitionConsumableNotifier) Mockito.mock(ResultPartitionConsumableNotifier.class);
        ResultPartitionWriter createConsumableNotifyingResultPartitionWriter2 = createConsumableNotifyingResultPartitionWriter(ResultPartitionType.BLOCKING, noOpTaskActions, jobID, resultPartitionConsumableNotifier2);
        createConsumableNotifyingResultPartitionWriter2.addBufferConsumer(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(32768), 0);
        ((ResultPartitionConsumableNotifier) Mockito.verify(resultPartitionConsumableNotifier2, Mockito.never())).notifyPartitionConsumable((JobID) Matchers.eq(jobID), (ResultPartitionID) Matchers.eq(createConsumableNotifyingResultPartitionWriter2.getPartitionId()), (TaskActions) Matchers.eq(noOpTaskActions));
    }

    @Test
    public void testAddOnFinishedPipelinedPartition() throws Exception {
        testAddOnFinishedPartition(ResultPartitionType.PIPELINED);
    }

    @Test
    public void testAddOnFinishedBlockingPartition() throws Exception {
        testAddOnFinishedPartition(ResultPartitionType.BLOCKING);
    }

    @Test
    public void testBlockingPartitionIsConsumableMultipleTimesIfNotReleasedOnConsumption() throws IOException {
        ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
        ResultPartition build = new ResultPartitionBuilder().isReleasedOnConsumption(false).setResultPartitionManager(resultPartitionManager).setResultPartitionType(ResultPartitionType.BLOCKING).setFileChannelManager(fileChannelManager).build();
        resultPartitionManager.registerResultPartition(build);
        build.finish();
        MatcherAssert.assertThat(resultPartitionManager.getUnreleasedPartitions(), org.hamcrest.Matchers.contains(new ResultPartitionID[]{build.getPartitionId()}));
        for (int i = 0; i < 2; i++) {
            build.createSubpartitionView(0, () -> {
            }).releaseAllResources();
            MatcherAssert.assertThat(resultPartitionManager.getUnreleasedPartitions(), org.hamcrest.Matchers.contains(new ResultPartitionID[]{build.getPartitionId()}));
            Assert.assertFalse(build.isReleased());
        }
    }

    private void testAddOnFinishedPartition(ResultPartitionType resultPartitionType) throws Exception {
        BufferConsumer createFilledFinishedBufferConsumer = BufferBuilderTestUtils.createFilledFinishedBufferConsumer(32768);
        ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = (ResultPartitionConsumableNotifier) Mockito.mock(ResultPartitionConsumableNotifier.class);
        JobID jobID = new JobID();
        NoOpTaskActions noOpTaskActions = new NoOpTaskActions();
        ResultPartitionWriter createConsumableNotifyingResultPartitionWriter = createConsumableNotifyingResultPartitionWriter(resultPartitionType, noOpTaskActions, jobID, resultPartitionConsumableNotifier);
        try {
            createConsumableNotifyingResultPartitionWriter.finish();
            Mockito.reset(new ResultPartitionConsumableNotifier[]{resultPartitionConsumableNotifier});
            createConsumableNotifyingResultPartitionWriter.addBufferConsumer(createFilledFinishedBufferConsumer, 0);
            Assert.fail("exception expected");
            if (!createFilledFinishedBufferConsumer.isRecycled()) {
                createFilledFinishedBufferConsumer.close();
                Assert.fail("bufferConsumer not recycled");
            }
            ((ResultPartitionConsumableNotifier) Mockito.verify(resultPartitionConsumableNotifier, Mockito.never())).notifyPartitionConsumable((JobID) Matchers.eq(jobID), (ResultPartitionID) Matchers.eq(createConsumableNotifyingResultPartitionWriter.getPartitionId()), (TaskActions) Matchers.eq(noOpTaskActions));
        } catch (IllegalStateException e) {
            if (!createFilledFinishedBufferConsumer.isRecycled()) {
                createFilledFinishedBufferConsumer.close();
                Assert.fail("bufferConsumer not recycled");
            }
            ((ResultPartitionConsumableNotifier) Mockito.verify(resultPartitionConsumableNotifier, Mockito.never())).notifyPartitionConsumable((JobID) Matchers.eq(jobID), (ResultPartitionID) Matchers.eq(createConsumableNotifyingResultPartitionWriter.getPartitionId()), (TaskActions) Matchers.eq(noOpTaskActions));
        } catch (Throwable th) {
            if (!createFilledFinishedBufferConsumer.isRecycled()) {
                createFilledFinishedBufferConsumer.close();
                Assert.fail("bufferConsumer not recycled");
            }
            ((ResultPartitionConsumableNotifier) Mockito.verify(resultPartitionConsumableNotifier, Mockito.never())).notifyPartitionConsumable((JobID) Matchers.eq(jobID), (ResultPartitionID) Matchers.eq(createConsumableNotifyingResultPartitionWriter.getPartitionId()), (TaskActions) Matchers.eq(noOpTaskActions));
            throw th;
        }
    }

    @Test
    public void testAddOnReleasedPipelinedPartition() throws Exception {
        testAddOnReleasedPartition(ResultPartitionType.PIPELINED);
    }

    @Test
    public void testAddOnReleasedBlockingPartition() throws Exception {
        testAddOnReleasedPartition(ResultPartitionType.BLOCKING);
    }

    private void testAddOnReleasedPartition(ResultPartitionType resultPartitionType) throws Exception {
        BufferConsumer createFilledFinishedBufferConsumer = BufferBuilderTestUtils.createFilledFinishedBufferConsumer(32768);
        ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = (ResultPartitionConsumableNotifier) Mockito.mock(ResultPartitionConsumableNotifier.class);
        JobID jobID = new JobID();
        NoOpTaskActions noOpTaskActions = new NoOpTaskActions();
        ResultPartition createPartition = resultPartitionType == ResultPartitionType.BLOCKING ? PartitionTestUtils.createPartition(resultPartitionType, fileChannelManager) : PartitionTestUtils.createPartition(resultPartitionType);
        ResultPartitionWriter resultPartitionWriter = ConsumableNotifyingResultPartitionWriterDecorator.decorate(Collections.singleton(PartitionTestUtils.createPartitionDeploymentDescriptor(resultPartitionType)), new ResultPartitionWriter[]{createPartition}, noOpTaskActions, jobID, resultPartitionConsumableNotifier)[0];
        try {
            createPartition.release();
            resultPartitionWriter.addBufferConsumer(createFilledFinishedBufferConsumer, 0);
            Assert.assertTrue(createPartition.isReleased());
            if (!createFilledFinishedBufferConsumer.isRecycled()) {
                createFilledFinishedBufferConsumer.close();
                Assert.fail("bufferConsumer not recycled");
            }
            ((ResultPartitionConsumableNotifier) Mockito.verify(resultPartitionConsumableNotifier, Mockito.never())).notifyPartitionConsumable((JobID) Matchers.eq(jobID), (ResultPartitionID) Matchers.eq(createPartition.getPartitionId()), (TaskActions) Matchers.eq(noOpTaskActions));
        } catch (Throwable th) {
            if (!createFilledFinishedBufferConsumer.isRecycled()) {
                createFilledFinishedBufferConsumer.close();
                Assert.fail("bufferConsumer not recycled");
            }
            ((ResultPartitionConsumableNotifier) Mockito.verify(resultPartitionConsumableNotifier, Mockito.never())).notifyPartitionConsumable((JobID) Matchers.eq(jobID), (ResultPartitionID) Matchers.eq(createPartition.getPartitionId()), (TaskActions) Matchers.eq(noOpTaskActions));
            throw th;
        }
    }

    @Test
    public void testAddOnPipelinedPartition() throws Exception {
        testAddOnPartition(ResultPartitionType.PIPELINED);
    }

    @Test
    public void testAddOnBlockingPartition() throws Exception {
        testAddOnPartition(ResultPartitionType.BLOCKING);
    }

    @Test
    public void testCreateSubpartitionOnFailingPartition() throws Exception {
        ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
        ResultPartition build = new ResultPartitionBuilder().setResultPartitionManager(resultPartitionManager).build();
        resultPartitionManager.registerResultPartition(build);
        build.fail((Throwable) null);
        PartitionTestUtils.verifyCreateSubpartitionViewThrowsException(resultPartitionManager, build.getPartitionId());
    }

    private void testAddOnPartition(ResultPartitionType resultPartitionType) throws Exception {
        ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = (ResultPartitionConsumableNotifier) Mockito.mock(ResultPartitionConsumableNotifier.class);
        JobID jobID = new JobID();
        NoOpTaskActions noOpTaskActions = new NoOpTaskActions();
        ResultPartitionWriter createConsumableNotifyingResultPartitionWriter = createConsumableNotifyingResultPartitionWriter(resultPartitionType, noOpTaskActions, jobID, resultPartitionConsumableNotifier);
        BufferConsumer createFilledFinishedBufferConsumer = BufferBuilderTestUtils.createFilledFinishedBufferConsumer(32768);
        try {
            createConsumableNotifyingResultPartitionWriter.addBufferConsumer(createFilledFinishedBufferConsumer, 0);
            Assert.assertFalse("bufferConsumer should not be recycled (still in the queue)", createFilledFinishedBufferConsumer.isRecycled());
            if (!createFilledFinishedBufferConsumer.isRecycled()) {
                createFilledFinishedBufferConsumer.close();
            }
            if (resultPartitionType.isPipelined()) {
                ((ResultPartitionConsumableNotifier) Mockito.verify(resultPartitionConsumableNotifier, Mockito.times(1))).notifyPartitionConsumable((JobID) Matchers.eq(jobID), (ResultPartitionID) Matchers.eq(createConsumableNotifyingResultPartitionWriter.getPartitionId()), (TaskActions) Matchers.eq(noOpTaskActions));
            }
        } catch (Throwable th) {
            if (!createFilledFinishedBufferConsumer.isRecycled()) {
                createFilledFinishedBufferConsumer.close();
            }
            if (resultPartitionType.isPipelined()) {
                ((ResultPartitionConsumableNotifier) Mockito.verify(resultPartitionConsumableNotifier, Mockito.times(1))).notifyPartitionConsumable((JobID) Matchers.eq(jobID), (ResultPartitionID) Matchers.eq(createConsumableNotifyingResultPartitionWriter.getPartitionId()), (TaskActions) Matchers.eq(noOpTaskActions));
            }
            throw th;
        }
    }

    @Test
    public void testReleaseMemoryOnPipelinedPartition() throws Exception {
        testReleaseMemory(ResultPartitionType.PIPELINED);
    }

    private void testReleaseMemory(ResultPartitionType resultPartitionType) throws Exception {
        NettyShuffleEnvironment build = new NettyShuffleEnvironmentBuilder().setNumNetworkBuffers(10).build();
        ResultPartition createPartition = PartitionTestUtils.createPartition(build, resultPartitionType, 1);
        try {
            createPartition.setup();
            for (int i = 0; i < 10; i++) {
                createPartition.addBufferConsumer(createPartition.getBufferPool().requestBufferBuilderBlocking().createBufferConsumer(), 0);
            }
            createPartition.finish();
            Assert.assertEquals(0L, createPartition.getBufferPool().getNumberOfAvailableMemorySegments());
            createPartition.getBufferPool().setNumBuffers(4);
            if (resultPartitionType.hasBackPressure()) {
                Assert.assertEquals(0L, createPartition.getBufferPool().getNumberOfAvailableMemorySegments());
            } else {
                Assert.assertEquals(4L, createPartition.getBufferPool().getNumberOfAvailableMemorySegments());
            }
        } finally {
            createPartition.release();
            build.close();
        }
    }

    @Test
    public void testPipelinedPartitionBufferPool() throws Exception {
        testPartitionBufferPool(ResultPartitionType.PIPELINED_BOUNDED);
    }

    @Test
    public void testBlockingPartitionBufferPool() throws Exception {
        testPartitionBufferPool(ResultPartitionType.BLOCKING);
    }

    private void testPartitionBufferPool(ResultPartitionType resultPartitionType) throws Exception {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(20, 1, 1);
        ResultPartition build = new ResultPartitionBuilder().setResultPartitionType(resultPartitionType).setFileChannelManager(fileChannelManager).setNetworkBuffersPerChannel(2).setFloatingNetworkBuffersPerGate(8).setNetworkBufferPool(networkBufferPool).build();
        try {
            build.setup();
            BufferPool bufferPool = build.getBufferPool();
            Assert.assertEquals(build.getNumberOfSubpartitions() + 1, bufferPool.getNumberOfRequiredMemorySegments());
            if (resultPartitionType.isBounded()) {
                Assert.assertEquals((2 * build.getNumberOfSubpartitions()) + 8, bufferPool.getMaxNumberOfMemorySegments());
            } else {
                Assert.assertEquals(2147483647L, bufferPool.getMaxNumberOfMemorySegments());
            }
        } finally {
            networkBufferPool.destroyAllBufferPools();
            networkBufferPool.destroy();
        }
    }

    private ResultPartitionWriter createConsumableNotifyingResultPartitionWriter(ResultPartitionType resultPartitionType, TaskActions taskActions, JobID jobID, ResultPartitionConsumableNotifier resultPartitionConsumableNotifier) {
        return ConsumableNotifyingResultPartitionWriterDecorator.decorate(Collections.singleton(PartitionTestUtils.createPartitionDeploymentDescriptor(resultPartitionType)), new ResultPartitionWriter[]{resultPartitionType == ResultPartitionType.BLOCKING ? PartitionTestUtils.createPartition(resultPartitionType, fileChannelManager) : PartitionTestUtils.createPartition(resultPartitionType)}, taskActions, jobID, resultPartitionConsumableNotifier)[0];
    }
}
