package org.apache.flink.runtime.executiongraph;

import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.class */
public class IntermediateResultPartitionTest extends TestLogger {
    @Test
    public void testPipelinedPartitionConsumable() throws Exception {
        IntermediateResult createResult = createResult(ResultPartitionType.PIPELINED, 2);
        IntermediateResultPartition intermediateResultPartition = createResult.getPartitions()[0];
        IntermediateResultPartition intermediateResultPartition2 = createResult.getPartitions()[1];
        Assert.assertFalse(intermediateResultPartition.isConsumable());
        Assert.assertFalse(intermediateResultPartition2.isConsumable());
        intermediateResultPartition.markDataProduced();
        Assert.assertTrue(intermediateResultPartition.isConsumable());
        Assert.assertFalse(intermediateResultPartition2.isConsumable());
        createResult.resetForNewExecution();
        Assert.assertFalse(intermediateResultPartition.isConsumable());
        Assert.assertFalse(intermediateResultPartition2.isConsumable());
    }

    @Test
    public void testBlockingPartitionConsumable() throws Exception {
        IntermediateResult createResult = createResult(ResultPartitionType.BLOCKING, 2);
        IntermediateResultPartition intermediateResultPartition = createResult.getPartitions()[0];
        IntermediateResultPartition intermediateResultPartition2 = createResult.getPartitions()[1];
        Assert.assertFalse(intermediateResultPartition.isConsumable());
        Assert.assertFalse(intermediateResultPartition2.isConsumable());
        intermediateResultPartition.markFinished();
        Assert.assertFalse(intermediateResultPartition.isConsumable());
        Assert.assertFalse(intermediateResultPartition2.isConsumable());
        Assert.assertFalse(createResult.areAllPartitionsFinished());
        intermediateResultPartition2.markFinished();
        Assert.assertTrue(intermediateResultPartition.isConsumable());
        Assert.assertTrue(intermediateResultPartition2.isConsumable());
        Assert.assertTrue(createResult.areAllPartitionsFinished());
        createResult.resetForNewExecution();
        Assert.assertFalse(intermediateResultPartition.isConsumable());
        Assert.assertFalse(intermediateResultPartition2.isConsumable());
        Assert.assertFalse(createResult.areAllPartitionsFinished());
    }

    @Test
    public void testBlockingPartitionResetting() throws Exception {
        IntermediateResult createResult = createResult(ResultPartitionType.BLOCKING, 2);
        IntermediateResultPartition intermediateResultPartition = createResult.getPartitions()[0];
        IntermediateResultPartition intermediateResultPartition2 = createResult.getPartitions()[1];
        Assert.assertFalse(intermediateResultPartition.isConsumable());
        Assert.assertFalse(intermediateResultPartition2.isConsumable());
        intermediateResultPartition.markFinished();
        Assert.assertEquals(1L, createResult.getNumberOfRunningProducers());
        Assert.assertFalse(intermediateResultPartition.isConsumable());
        Assert.assertFalse(intermediateResultPartition2.isConsumable());
        Assert.assertFalse(createResult.areAllPartitionsFinished());
        createResult.resetForNewExecution();
        Assert.assertEquals(2L, createResult.getNumberOfRunningProducers());
        intermediateResultPartition2.markFinished();
        Assert.assertEquals(1L, createResult.getNumberOfRunningProducers());
        Assert.assertFalse(intermediateResultPartition.isConsumable());
        Assert.assertFalse(intermediateResultPartition2.isConsumable());
        Assert.assertFalse(createResult.areAllPartitionsFinished());
        intermediateResultPartition.markFinished();
        Assert.assertEquals(0L, createResult.getNumberOfRunningProducers());
        Assert.assertTrue(intermediateResultPartition.isConsumable());
        Assert.assertTrue(intermediateResultPartition2.isConsumable());
        Assert.assertTrue(createResult.areAllPartitionsFinished());
        createResult.resetForNewExecution();
        Assert.assertEquals(2L, createResult.getNumberOfRunningProducers());
        Assert.assertFalse(intermediateResultPartition.isConsumable());
        Assert.assertFalse(intermediateResultPartition2.isConsumable());
        Assert.assertFalse(createResult.areAllPartitionsFinished());
    }

    private static IntermediateResult createResult(ResultPartitionType resultPartitionType, int i) throws Exception {
        JobVertex jobVertex = new JobVertex("v1");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(i);
        jobVertex.createAndAddResultDataSet(resultPartitionType);
        return ExecutionGraphTestUtils.getExecutionJobVertex(jobVertex, new DirectScheduledExecutorService()).getProducedDataSets()[0];
    }
}
