package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionAvailabilityTest.class */
public class BoundedBlockingSubpartitionAvailabilityTest {

    @ClassRule
    public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
    private static final int BUFFER_SIZE = 32768;

    @Test
    public void testInitiallyNotAvailable() throws Exception {
        ResultSubpartition createPartitionWithData = createPartitionWithData(10);
        ResultSubpartitionView createView = PartitionTestUtils.createView(createPartitionWithData, new CountingAvailabilityListener());
        Assert.assertEquals(0L, r0.numNotifications);
        createView.releaseAllResources();
        createPartitionWithData.release();
    }

    @Test
    public void testUnavailableWhenBuffersExhausted() throws Exception {
        ResultSubpartition createPartitionWithData = createPartitionWithData(100000);
        ResultSubpartitionView createView = PartitionTestUtils.createView(createPartitionWithData, new CountingAvailabilityListener());
        List<ResultSubpartition.BufferAndBacklog> drainAvailableData = drainAvailableData(createView);
        Assert.assertFalse(createView.getAvailabilityAndBacklog(Integer.MAX_VALUE).isAvailable());
        Assert.assertFalse(drainAvailableData.get(drainAvailableData.size() - 1).isDataAvailable());
        createView.releaseAllResources();
        createPartitionWithData.release();
    }

    @Test
    public void testAvailabilityNotificationWhenBuffersReturn() throws Exception {
        ResultSubpartition createPartitionWithData = createPartitionWithData(100000);
        ResultSubpartitionView createView = PartitionTestUtils.createView(createPartitionWithData, new CountingAvailabilityListener());
        List<ResultSubpartition.BufferAndBacklog> drainAvailableData = drainAvailableData(createView);
        drainAvailableData.get(0).buffer().recycleBuffer();
        drainAvailableData.get(1).buffer().recycleBuffer();
        Assert.assertTrue(createView.getAvailabilityAndBacklog(Integer.MAX_VALUE).isAvailable());
        Assert.assertEquals(1L, r0.numNotifications);
        createView.releaseAllResources();
        createPartitionWithData.release();
    }

    @Test
    public void testNotAvailableWhenEmpty() throws Exception {
        ResultSubpartition createPartitionWithData = createPartitionWithData(100000);
        ResultSubpartitionView createReadView = createPartitionWithData.createReadView(new NoOpBufferAvailablityListener());
        drainAllData(createReadView);
        Assert.assertFalse(createReadView.getAvailabilityAndBacklog(Integer.MAX_VALUE).isAvailable());
        createReadView.releaseAllResources();
        createPartitionWithData.release();
    }

    private static ResultSubpartition createPartitionWithData(int i) throws IOException {
        ResultSubpartition resultSubpartition = new ResultPartitionBuilder().setResultPartitionType(ResultPartitionType.BLOCKING_PERSISTENT).setBoundedBlockingSubpartitionType(BoundedBlockingSubpartitionType.FILE).setSSLEnabled(true).setFileChannelManager(new FileChannelManagerImpl(new String[]{TMP_FOLDER.newFolder().toString()}, "data")).setNetworkBufferSize(32768).build().getAllPartitions()[0];
        writeBuffers(resultSubpartition, i);
        resultSubpartition.finish();
        return resultSubpartition;
    }

    private static void writeBuffers(ResultSubpartition resultSubpartition, int i) throws IOException {
        for (int i2 = 0; i2 < i; i2++) {
            resultSubpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(32768));
        }
    }

    private static List<ResultSubpartition.BufferAndBacklog> drainAvailableData(ResultSubpartitionView resultSubpartitionView) throws Exception {
        ArrayList arrayList = new ArrayList();
        while (true) {
            ResultSubpartition.BufferAndBacklog nextBuffer = resultSubpartitionView.getNextBuffer();
            if (nextBuffer == null) {
                return arrayList;
            }
            arrayList.add(nextBuffer);
        }
    }

    private static void drainAllData(ResultSubpartitionView resultSubpartitionView) throws Exception {
        while (true) {
            ResultSubpartition.BufferAndBacklog nextBuffer = resultSubpartitionView.getNextBuffer();
            if (nextBuffer == null) {
                return;
            } else {
                nextBuffer.buffer().recycleBuffer();
            }
        }
    }
}
