/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.buffer.CompositeBuffer;
import org.apache.flink.runtime.io.network.buffer.FullyFilledBuffer;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.PartitionedFile;
import org.apache.flink.runtime.io.network.partition.PartitionedFileReader;
import org.apache.flink.runtime.io.network.partition.PartitionedFileWriteReadTest;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionIndexSet;
import org.apache.flink.runtime.io.network.partition.SortMergeResultPartitionReadScheduler;
import org.apache.flink.runtime.io.network.partition.SortMergeSubpartitionReader;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;

class SortMergeResultPartitionReadSchedulerTest {
    private static final int bufferSize = 1024;
    private static final byte[] dataBytes = new byte[1024];
    private static final int totalBytes = 2048;
    private static final int numThreads = 4;
    private static final int numSubpartitions = 10;
    private static final int numBuffersPerSubpartition = 10;
    private PartitionedFile partitionedFile;
    private PartitionedFileReader fileReader;
    private FileChannel dataFileChannel;
    private FileChannel indexFileChannel;
    private BatchShuffleReadBufferPool bufferPool;
    private ExecutorService executor;
    private SortMergeResultPartitionReadScheduler readScheduler;

    SortMergeResultPartitionReadSchedulerTest() {
    }

    @BeforeEach
    void before(@TempDir Path basePath) throws Exception {
        Random random = new Random();
        random.nextBytes(dataBytes);
        this.partitionedFile = PartitionTestUtils.createPartitionedFile(basePath.toString(), 10, 10, 1024, dataBytes);
        this.dataFileChannel = SortMergeResultPartitionReadSchedulerTest.openFileChannel(this.partitionedFile.getDataFilePath());
        this.indexFileChannel = SortMergeResultPartitionReadSchedulerTest.openFileChannel(this.partitionedFile.getIndexFilePath());
        this.fileReader = new PartitionedFileReader(this.partitionedFile, new ResultSubpartitionIndexSet(0), this.dataFileChannel, this.indexFileChannel, BufferReaderWriterUtil.allocatedHeaderBuffer(), PartitionedFileWriteReadTest.createAndConfigIndexEntryBuffer(), 0);
        this.bufferPool = new BatchShuffleReadBufferPool(2048L, 1024);
        this.executor = Executors.newFixedThreadPool(4);
        this.readScheduler = new SortMergeResultPartitionReadScheduler(this.bufferPool, (Executor)this.executor, new Object());
    }

    @AfterEach
    void after() throws Exception {
        this.dataFileChannel.close();
        this.indexFileChannel.close();
        this.partitionedFile.deleteQuietly();
        this.bufferPool.destroy();
        this.executor.shutdown();
    }

    @Test
    @Timeout(value=60L)
    void testCreateSubpartitionReader() throws Exception {
        ManuallyTriggeredScheduledExecutorService ioExecutor = new ManuallyTriggeredScheduledExecutorService();
        this.readScheduler = new SortMergeResultPartitionReadScheduler(this.bufferPool, (Executor)ioExecutor, new Object());
        SortMergeSubpartitionReader subpartitionReader = this.readScheduler.createSubpartitionReader((BufferAvailabilityListener)new NoOpBufferAvailablityListener(), new ResultSubpartitionIndexSet(0), this.partitionedFile, 0);
        Assertions.assertThat((boolean)this.readScheduler.isRunning()).isTrue();
        Assertions.assertThat((boolean)this.readScheduler.getDataFileChannel().isOpen()).isTrue();
        Assertions.assertThat((boolean)this.readScheduler.getIndexFileChannel().isOpen()).isTrue();
        Assertions.assertThat((int)ioExecutor.numQueuedRunnables()).isEqualTo(1);
        int numBuffersRead = 0;
        while (numBuffersRead < 10) {
            ioExecutor.triggerAll();
            ResultSubpartition.BufferAndBacklog bufferAndBacklog = subpartitionReader.getNextBuffer();
            if (bufferAndBacklog == null) continue;
            int numBytes = bufferAndBacklog.buffer().readableBytes();
            MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment((int)numBytes);
            FullyFilledBuffer fullyFilledBuffer = (FullyFilledBuffer)bufferAndBacklog.buffer();
            Assertions.assertThat((int)fullyFilledBuffer.getPartialBuffers().size()).isOne();
            Buffer fullBuffer = ((CompositeBuffer)fullyFilledBuffer.getPartialBuffers().get(0)).getFullBufferData(segment);
            Assertions.assertThat((Comparable)ByteBuffer.wrap(dataBytes)).isEqualTo((Object)fullBuffer.getNioBufferReadable());
            fullBuffer.recycleBuffer();
            ++numBuffersRead;
        }
    }

    @Test
    void testOnSubpartitionReaderError() throws Exception {
        SortMergeSubpartitionReader subpartitionReader = this.readScheduler.createSubpartitionReader((BufferAvailabilityListener)new NoOpBufferAvailablityListener(), new ResultSubpartitionIndexSet(0), this.partitionedFile, 0);
        subpartitionReader.releaseAllResources();
        this.waitUntilReadFinish();
        this.assertAllResourcesReleased();
    }

    @Test
    void testReleaseWhileReading() throws Exception {
        SortMergeSubpartitionReader subpartitionReader = this.readScheduler.createSubpartitionReader((BufferAvailabilityListener)new NoOpBufferAvailablityListener(), new ResultSubpartitionIndexSet(0), this.partitionedFile, 0);
        Thread.sleep(1000L);
        this.readScheduler.release();
        Assertions.assertThat((Throwable)subpartitionReader.getFailureCause()).isNotNull();
        Assertions.assertThat((boolean)subpartitionReader.isReleased()).isTrue();
        Assertions.assertThat((int)subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(0);
        Assertions.assertThat((boolean)subpartitionReader.getAvailabilityAndBacklog(false).isAvailable()).isTrue();
        this.readScheduler.getReleaseFuture().get();
        this.assertAllResourcesReleased();
    }

    @Test
    void testCreateSubpartitionReaderAfterReleased() throws Exception {
        this.bufferPool.initialize();
        this.readScheduler.release();
        Assertions.assertThatThrownBy(() -> this.readScheduler.createSubpartitionReader((BufferAvailabilityListener)new NoOpBufferAvailablityListener(), new ResultSubpartitionIndexSet(0), this.partitionedFile, 0)).isInstanceOf(IllegalStateException.class);
        this.assertAllResourcesReleased();
    }

    @Test
    void testOnDataReadError() throws Exception {
        SortMergeSubpartitionReader subpartitionReader = this.readScheduler.createSubpartitionReader((BufferAvailabilityListener)new NoOpBufferAvailablityListener(), new ResultSubpartitionIndexSet(0), this.partitionedFile, 0);
        this.readScheduler.getDataFileChannel().close();
        while (!subpartitionReader.isReleased()) {
            ResultSubpartition.BufferAndBacklog bufferAndBacklog = subpartitionReader.getNextBuffer();
            if (bufferAndBacklog == null) continue;
            bufferAndBacklog.buffer().recycleBuffer();
        }
        this.waitUntilReadFinish();
        Assertions.assertThat((Throwable)subpartitionReader.getFailureCause()).isNotNull();
        Assertions.assertThat((boolean)subpartitionReader.getAvailabilityAndBacklog(false).isAvailable()).isTrue();
        this.assertAllResourcesReleased();
    }

    @Test
    void testOnReadBufferRequestError() throws Exception {
        ManuallyTriggeredScheduledExecutorService schedulerExecutor = new ManuallyTriggeredScheduledExecutorService();
        this.readScheduler = new SortMergeResultPartitionReadScheduler(this.bufferPool, (Executor)schedulerExecutor, new Object());
        SortMergeSubpartitionReader subpartitionReader = this.readScheduler.createSubpartitionReader((BufferAvailabilityListener)new NoOpBufferAvailablityListener(), new ResultSubpartitionIndexSet(0), this.partitionedFile, 0);
        this.bufferPool.destroy();
        Assertions.assertThat((int)schedulerExecutor.numQueuedRunnables()).isEqualTo(1);
        schedulerExecutor.trigger();
        this.waitUntilReadFinish();
        Assertions.assertThat((boolean)subpartitionReader.isReleased()).isTrue();
        Assertions.assertThat((Throwable)subpartitionReader.getFailureCause()).isNotNull();
        Assertions.assertThat((boolean)subpartitionReader.getAvailabilityAndBacklog(false).isAvailable()).isTrue();
        this.assertAllResourcesReleased();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Timeout(value=60L)
    void testNoDeadlockWhenReadAndReleaseBuffers() throws Exception {
        this.bufferPool.initialize();
        SortMergeSubpartitionReader subpartitionReader = new SortMergeSubpartitionReader(1024, (BufferAvailabilityListener)new NoOpBufferAvailablityListener(), this.fileReader);
        Thread readAndReleaseThread = new Thread(() -> {
            ArrayDeque<MemorySegment> segments = new ArrayDeque<MemorySegment>();
            segments.add(MemorySegmentFactory.allocateUnpooledSegment((int)1024));
            try {
                Assertions.assertThat((boolean)this.fileReader.hasRemaining()).isTrue();
                subpartitionReader.readBuffers(segments, (BufferRecycler)this.readScheduler);
                subpartitionReader.releaseAllResources();
                subpartitionReader.readBuffers(segments, (BufferRecycler)this.readScheduler);
            }
            catch (Exception exception) {
                // empty catch block
            }
        });
        SortMergeResultPartitionReadSchedulerTest sortMergeResultPartitionReadSchedulerTest = this;
        synchronized (sortMergeResultPartitionReadSchedulerTest) {
            readAndReleaseThread.start();
            do {
                Thread.sleep(100L);
            } while (!subpartitionReader.isReleased());
        }
        readAndReleaseThread.join();
    }

    @Test
    void testRequestBufferTimeout() throws Exception {
        Duration bufferRequestTimeout = Duration.ofSeconds(3L);
        ManuallyTriggeredScheduledExecutorService executorService = new ManuallyTriggeredScheduledExecutorService();
        SortMergeResultPartitionReadScheduler readScheduler = new SortMergeResultPartitionReadScheduler(this.bufferPool, (Executor)executorService, (Object)this, bufferRequestTimeout);
        long startTimestamp = System.currentTimeMillis();
        readScheduler.createSubpartitionReader((BufferAvailabilityListener)new NoOpBufferAvailablityListener(), new ResultSubpartitionIndexSet(0), this.partitionedFile, 0);
        readScheduler.run();
        Assertions.assertThat((int)this.bufferPool.getAvailableBuffers()).isZero();
        Assertions.assertThatThrownBy(() -> ((SortMergeResultPartitionReadScheduler)readScheduler).allocateBuffers()).isInstanceOf(TimeoutException.class);
        long requestDuration = System.currentTimeMillis() - startTimestamp;
        Assertions.assertThat((requestDuration >= bufferRequestTimeout.toMillis() ? 1 : 0) != 0).isTrue();
        readScheduler.release();
    }

    @Test
    void testRequestTimeoutIsRefreshedAndSuccess() throws Exception {
        Duration bufferRequestTimeout = Duration.ofSeconds(3L);
        FakeBatchShuffleReadBufferPool bufferPool = new FakeBatchShuffleReadBufferPool(3072L, 1024);
        SortMergeResultPartitionReadScheduler readScheduler = new SortMergeResultPartitionReadScheduler((BatchShuffleReadBufferPool)bufferPool, (Executor)this.executor, (Object)this, bufferRequestTimeout);
        long startTimestamp = System.currentTimeMillis();
        ArrayDeque allocatedBuffers = new ArrayDeque();
        Assertions.assertThatCode(() -> allocatedBuffers.addAll(readScheduler.allocateBuffers())).doesNotThrowAnyException();
        long requestDuration = System.currentTimeMillis() - startTimestamp;
        Assertions.assertThat(allocatedBuffers).hasSize(3);
        Assertions.assertThat((long)requestDuration).isGreaterThan(bufferRequestTimeout.toMillis() * 2L);
        bufferPool.recycle(allocatedBuffers);
        bufferPool.destroy();
        readScheduler.release();
    }

    private static FileChannel openFileChannel(Path path) throws IOException {
        return FileChannel.open(path, StandardOpenOption.READ);
    }

    private void assertAllResourcesReleased() {
        Assertions.assertThat((Object)this.readScheduler.getDataFileChannel()).isNull();
        Assertions.assertThat((Object)this.readScheduler.getIndexFileChannel()).isNull();
        Assertions.assertThat((boolean)this.readScheduler.isRunning()).isFalse();
        Assertions.assertThat((int)this.readScheduler.getNumPendingReaders()).isZero();
        if (!this.bufferPool.isDestroyed()) {
            Assertions.assertThat((int)this.bufferPool.getNumTotalBuffers()).isEqualTo(this.bufferPool.getAvailableBuffers());
        }
    }

    private void waitUntilReadFinish() throws Exception {
        while (this.readScheduler.isRunning()) {
            Thread.sleep(100L);
        }
    }

    private static class FakeBatchShuffleReadBufferPool
    extends BatchShuffleReadBufferPool {
        private final Queue<MemorySegment> requestedBuffers = new LinkedList<MemorySegment>(this.requestBuffers());

        FakeBatchShuffleReadBufferPool(long totalBytes, int bufferSize) throws Exception {
            super(totalBytes, bufferSize);
        }

        public long getLastBufferOperationTimestamp() {
            this.recycle(this.requestedBuffers.poll());
            return super.getLastBufferOperationTimestamp();
        }

        public void destroy() {
            this.recycle(this.requestedBuffers);
            this.requestedBuffers.clear();
            super.destroy();
        }
    }
}

