package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.class */
class SortMergeResultPartitionReadSchedulerTest {
    private static final int bufferSize = 1024;
    private static final byte[] dataBytes = new byte[bufferSize];
    private static final int totalBytes = 2048;
    private static final int numThreads = 4;
    private static final int numSubpartitions = 10;
    private static final int numBuffersPerSubpartition = 10;
    private PartitionedFile partitionedFile;
    private PartitionedFileReader fileReader;
    private FileChannel dataFileChannel;
    private FileChannel indexFileChannel;
    private BatchShuffleReadBufferPool bufferPool;
    private ExecutorService executor;
    private SortMergeResultPartitionReadScheduler readScheduler;

    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest$FakeBatchShuffleReadBufferPool.class */
    private static class FakeBatchShuffleReadBufferPool extends BatchShuffleReadBufferPool {
        private final Queue<MemorySegment> requestedBuffers;

        FakeBatchShuffleReadBufferPool(long j, int i) throws Exception {
            super(j, i);
            this.requestedBuffers = new LinkedList(requestBuffers());
        }

        public long getLastBufferOperationTimestamp() {
            recycle(this.requestedBuffers.poll());
            return super.getLastBufferOperationTimestamp();
        }

        public void destroy() {
            recycle(this.requestedBuffers);
            this.requestedBuffers.clear();
            super.destroy();
        }
    }

    SortMergeResultPartitionReadSchedulerTest() {
    }

    @BeforeEach
    void before(@TempDir Path path) throws Exception {
        new Random().nextBytes(dataBytes);
        this.partitionedFile = PartitionTestUtils.createPartitionedFile(path.toString(), 10, 10, bufferSize, dataBytes);
        this.dataFileChannel = openFileChannel(this.partitionedFile.getDataFilePath());
        this.indexFileChannel = openFileChannel(this.partitionedFile.getIndexFilePath());
        this.fileReader = new PartitionedFileReader(this.partitionedFile, 0, this.dataFileChannel, this.indexFileChannel, BufferReaderWriterUtil.allocatedHeaderBuffer(), PartitionedFileWriteReadTest.createAndConfigIndexEntryBuffer());
        this.bufferPool = new BatchShuffleReadBufferPool(2048L, bufferSize);
        this.executor = Executors.newFixedThreadPool(4);
        this.readScheduler = new SortMergeResultPartitionReadScheduler(this.bufferPool, this.executor, new Object());
    }

    @AfterEach
    void after() throws Exception {
        this.dataFileChannel.close();
        this.indexFileChannel.close();
        this.partitionedFile.deleteQuietly();
        this.bufferPool.destroy();
        this.executor.shutdown();
    }

    @Timeout(60)
    @Test
    void testCreateSubpartitionReader() throws Exception {
        ManuallyTriggeredScheduledExecutorService manuallyTriggeredScheduledExecutorService = new ManuallyTriggeredScheduledExecutorService();
        this.readScheduler = new SortMergeResultPartitionReadScheduler(this.bufferPool, manuallyTriggeredScheduledExecutorService, new Object());
        SortMergeSubpartitionReader createSubpartitionReader = this.readScheduler.createSubpartitionReader(new NoOpBufferAvailablityListener(), 0, this.partitionedFile);
        Assertions.assertThat(this.readScheduler.isRunning()).isTrue();
        Assertions.assertThat(this.readScheduler.getDataFileChannel().isOpen()).isTrue();
        Assertions.assertThat(this.readScheduler.getIndexFileChannel().isOpen()).isTrue();
        Assertions.assertThat(manuallyTriggeredScheduledExecutorService.numQueuedRunnables()).isEqualTo(1);
        int i = 0;
        while (i < 10) {
            manuallyTriggeredScheduledExecutorService.triggerAll();
            ResultSubpartition.BufferAndBacklog nextBuffer = createSubpartitionReader.getNextBuffer();
            if (nextBuffer != null) {
                Buffer fullBufferData = nextBuffer.buffer().getFullBufferData(MemorySegmentFactory.allocateUnpooledSegment(nextBuffer.buffer().readableBytes()));
                Assertions.assertThat(ByteBuffer.wrap(dataBytes)).isEqualTo(fullBufferData.getNioBufferReadable());
                fullBufferData.recycleBuffer();
                i++;
            }
        }
    }

    @Test
    void testOnSubpartitionReaderError() throws Exception {
        this.readScheduler.createSubpartitionReader(new NoOpBufferAvailablityListener(), 0, this.partitionedFile).releaseAllResources();
        waitUntilReadFinish();
        assertAllResourcesReleased();
    }

    @Test
    void testReleaseWhileReading() throws Exception {
        SortMergeSubpartitionReader createSubpartitionReader = this.readScheduler.createSubpartitionReader(new NoOpBufferAvailablityListener(), 0, this.partitionedFile);
        Thread.sleep(1000L);
        this.readScheduler.release();
        Assertions.assertThat(createSubpartitionReader.getFailureCause()).isNotNull();
        Assertions.assertThat(createSubpartitionReader.isReleased()).isTrue();
        Assertions.assertThat(createSubpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(0);
        Assertions.assertThat(createSubpartitionReader.getAvailabilityAndBacklog(0).isAvailable()).isTrue();
        this.readScheduler.getReleaseFuture().get();
        assertAllResourcesReleased();
    }

    @Test
    void testCreateSubpartitionReaderAfterReleased() throws Exception {
        this.bufferPool.initialize();
        this.readScheduler.release();
        Assertions.assertThatThrownBy(() -> {
            this.readScheduler.createSubpartitionReader(new NoOpBufferAvailablityListener(), 0, this.partitionedFile);
        }).isInstanceOf(IllegalStateException.class);
        assertAllResourcesReleased();
    }

    @Test
    void testOnDataReadError() throws Exception {
        SortMergeSubpartitionReader createSubpartitionReader = this.readScheduler.createSubpartitionReader(new NoOpBufferAvailablityListener(), 0, this.partitionedFile);
        this.readScheduler.getDataFileChannel().close();
        while (!createSubpartitionReader.isReleased()) {
            ResultSubpartition.BufferAndBacklog nextBuffer = createSubpartitionReader.getNextBuffer();
            if (nextBuffer != null) {
                nextBuffer.buffer().recycleBuffer();
            }
        }
        waitUntilReadFinish();
        Assertions.assertThat(createSubpartitionReader.getFailureCause()).isNotNull();
        Assertions.assertThat(createSubpartitionReader.getAvailabilityAndBacklog(0).isAvailable()).isTrue();
        assertAllResourcesReleased();
    }

    @Test
    void testOnReadBufferRequestError() throws Exception {
        SortMergeSubpartitionReader createSubpartitionReader = this.readScheduler.createSubpartitionReader(new NoOpBufferAvailablityListener(), 0, this.partitionedFile);
        this.bufferPool.destroy();
        waitUntilReadFinish();
        Assertions.assertThat(createSubpartitionReader.isReleased()).isTrue();
        Assertions.assertThat(createSubpartitionReader.getFailureCause()).isNotNull();
        Assertions.assertThat(createSubpartitionReader.getAvailabilityAndBacklog(0).isAvailable()).isTrue();
        assertAllResourcesReleased();
    }

    @Timeout(60)
    @Test
    void testNoDeadlockWhenReadAndReleaseBuffers() throws Exception {
        this.bufferPool.initialize();
        SortMergeSubpartitionReader sortMergeSubpartitionReader = new SortMergeSubpartitionReader(new NoOpBufferAvailablityListener(), this.fileReader);
        Thread thread = new Thread(() -> {
            ArrayDeque arrayDeque = new ArrayDeque();
            arrayDeque.add(MemorySegmentFactory.allocateUnpooledSegment(bufferSize));
            try {
                Assertions.assertThat(this.fileReader.hasRemaining()).isTrue();
                sortMergeSubpartitionReader.readBuffers(arrayDeque, this.readScheduler);
                sortMergeSubpartitionReader.releaseAllResources();
                sortMergeSubpartitionReader.readBuffers(arrayDeque, this.readScheduler);
            } catch (Exception e) {
            }
        });
        synchronized (this) {
            thread.start();
            do {
                Thread.sleep(100L);
            } while (!sortMergeSubpartitionReader.isReleased());
        }
        thread.join();
    }

    @Test
    void testRequestBufferTimeout() throws Exception {
        Duration ofSeconds = Duration.ofSeconds(3L);
        SortMergeResultPartitionReadScheduler sortMergeResultPartitionReadScheduler = new SortMergeResultPartitionReadScheduler(this.bufferPool, new ManuallyTriggeredScheduledExecutorService(), this, ofSeconds);
        long nanoTime = System.nanoTime();
        sortMergeResultPartitionReadScheduler.createSubpartitionReader(new NoOpBufferAvailablityListener(), 0, this.partitionedFile);
        sortMergeResultPartitionReadScheduler.run();
        Assertions.assertThat(this.bufferPool.getAvailableBuffers()).isZero();
        sortMergeResultPartitionReadScheduler.getClass();
        Assertions.assertThatThrownBy(sortMergeResultPartitionReadScheduler::allocateBuffers).isInstanceOf(TimeoutException.class);
        Assertions.assertThat(System.nanoTime() - nanoTime >= ofSeconds.toNanos()).isTrue();
        sortMergeResultPartitionReadScheduler.release();
    }

    @Test
    void testRequestTimeoutIsRefreshedAndSuccess() throws Exception {
        Duration ofSeconds = Duration.ofSeconds(3L);
        FakeBatchShuffleReadBufferPool fakeBatchShuffleReadBufferPool = new FakeBatchShuffleReadBufferPool(3072L, bufferSize);
        SortMergeResultPartitionReadScheduler sortMergeResultPartitionReadScheduler = new SortMergeResultPartitionReadScheduler(fakeBatchShuffleReadBufferPool, this.executor, this, ofSeconds);
        long nanoTime = System.nanoTime();
        ArrayDeque arrayDeque = new ArrayDeque();
        Assertions.assertThatCode(() -> {
            arrayDeque.addAll(sortMergeResultPartitionReadScheduler.allocateBuffers());
        }).doesNotThrowAnyException();
        long nanoTime2 = System.nanoTime() - nanoTime;
        Assertions.assertThat(arrayDeque).hasSize(3);
        Assertions.assertThat(nanoTime2).isGreaterThan(ofSeconds.toNanos() * 2);
        fakeBatchShuffleReadBufferPool.recycle(arrayDeque);
        fakeBatchShuffleReadBufferPool.destroy();
        sortMergeResultPartitionReadScheduler.release();
    }

    private static FileChannel openFileChannel(Path path) throws IOException {
        return FileChannel.open(path, StandardOpenOption.READ);
    }

    private void assertAllResourcesReleased() {
        Assertions.assertThat(this.readScheduler.getDataFileChannel()).isNull();
        Assertions.assertThat(this.readScheduler.getIndexFileChannel()).isNull();
        Assertions.assertThat(this.readScheduler.isRunning()).isFalse();
        Assertions.assertThat(this.readScheduler.getNumPendingReaders()).isZero();
        if (this.bufferPool.isDestroyed()) {
            return;
        }
        Assertions.assertThat(this.bufferPool.getNumTotalBuffers()).isEqualTo(this.bufferPool.getAvailableBuffers());
    }

    private void waitUntilReadFinish() throws Exception {
        while (this.readScheduler.isRunning()) {
            Thread.sleep(100L);
        }
    }
}
