/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.hybrid;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Optional;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.hybrid.HsDataView;
import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndex;
import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl;
import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataManager;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSubpartitionFileReader;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSubpartitionFileReaderImpl;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSubpartitionView;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSubpartitionViewInternalOperations;
import org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleConfiguration;
import org.apache.flink.runtime.io.network.partition.hybrid.TestingHsDataView;
import org.apache.flink.runtime.io.network.partition.hybrid.TestingSubpartitionViewInternalOperation;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLoggerExtension;
import org.apache.flink.util.function.BiConsumerWithException;
import org.apache.flink.util.function.FunctionWithException;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;

@ExtendWith(value={TestLoggerExtension.class})
class HsFileDataManagerTest {
    private static final int BUFFER_SIZE = 1024;
    private static final int NUM_SUBPARTITIONS = 10;
    private static final int BUFFER_POOL_SIZE = 2;
    private final byte[] dataBytes = new byte[1024];
    private ManuallyTriggeredScheduledExecutorService ioExecutor;
    private BatchShuffleReadBufferPool bufferPool;
    private FileChannel dataFileChannel;
    private Path dataFilePath;
    private HsFileDataManager fileDataManager;
    private TestingSubpartitionViewInternalOperation subpartitionViewOperation;
    private TestingHsSubpartitionFileReader.Factory factory;

    HsFileDataManagerTest() {
    }

    @BeforeEach
    void before(@TempDir Path tempDir) throws IOException {
        Random random = new Random();
        random.nextBytes(this.dataBytes);
        this.bufferPool = new BatchShuffleReadBufferPool(2048L, 1024);
        this.ioExecutor = new ManuallyTriggeredScheduledExecutorService();
        this.dataFilePath = Files.createFile(tempDir.resolve(".data"), new FileAttribute[0]);
        this.dataFileChannel = HsFileDataManagerTest.openFileChannel(this.dataFilePath);
        this.factory = new TestingHsSubpartitionFileReader.Factory();
        this.fileDataManager = new HsFileDataManager(this.bufferPool, (ScheduledExecutorService)this.ioExecutor, (HsFileDataIndex)new HsFileDataIndexImpl(10), this.dataFilePath, (HsSubpartitionFileReader.Factory)this.factory, HybridShuffleConfiguration.builder((int)10, (int)this.bufferPool.getNumBuffersPerRequest()).build());
        this.subpartitionViewOperation = new TestingSubpartitionViewInternalOperation();
    }

    @AfterEach
    void after() throws Exception {
        this.bufferPool.destroy();
        if (this.dataFileChannel != null) {
            this.dataFileChannel.close();
        }
    }

    @Test
    void testRegisterReaderTriggerRun() throws Exception {
        TestingHsSubpartitionFileReader reader = new TestingHsSubpartitionFileReader();
        reader.setReadBuffersConsumer((BiConsumerWithException<Queue<MemorySegment>, Queue<MemorySegment>, IOException>)((BiConsumerWithException)(requestedBuffers, readBuffers) -> readBuffers.addAll(requestedBuffers)));
        this.factory.allReaders.add(reader);
        Assertions.assertThat((Collection)reader.readBuffers).isEmpty();
        this.fileDataManager.registerNewSubpartition(0, (HsSubpartitionViewInternalOperations)this.subpartitionViewOperation);
        this.ioExecutor.trigger();
        Assertions.assertThat((Collection)reader.readBuffers).hasSize(2);
    }

    @Test
    void testBufferReleasedTriggerRun() throws Exception {
        TestingHsSubpartitionFileReader reader = new TestingHsSubpartitionFileReader();
        reader.setReadBuffersConsumer((BiConsumerWithException<Queue<MemorySegment>, Queue<MemorySegment>, IOException>)((BiConsumerWithException)(requestedBuffer, readBuffers) -> {
            while (!requestedBuffer.isEmpty()) {
                readBuffers.add(requestedBuffer.poll());
            }
        }));
        this.factory.allReaders.add(reader);
        this.fileDataManager.registerNewSubpartition(0, (HsSubpartitionViewInternalOperations)this.subpartitionViewOperation);
        this.ioExecutor.trigger();
        Assertions.assertThat((Collection)reader.readBuffers).hasSize(2);
        Assertions.assertThat((int)this.bufferPool.getAvailableBuffers()).isZero();
        this.fileDataManager.recycle((MemorySegment)reader.readBuffers.poll());
        this.fileDataManager.recycle((MemorySegment)reader.readBuffers.poll());
        this.ioExecutor.trigger();
        Assertions.assertThat((Collection)reader.readBuffers).hasSize(2);
    }

    @Test
    void testRunReleaseUnusedBuffers() throws Exception {
        TestingHsSubpartitionFileReader reader = new TestingHsSubpartitionFileReader();
        CompletableFuture prepareForSchedulingFinished = new CompletableFuture();
        reader.setPrepareForSchedulingRunnable(() -> prepareForSchedulingFinished.complete(null));
        reader.setReadBuffersConsumer((BiConsumerWithException<Queue<MemorySegment>, Queue<MemorySegment>, IOException>)((BiConsumerWithException)(requestedBuffers, readBuffers) -> {
            Assertions.assertThat((CompletableFuture)prepareForSchedulingFinished).isCompleted();
            Assertions.assertThat((Collection)requestedBuffers).hasSize(2);
            Assertions.assertThat((int)this.bufferPool.getAvailableBuffers()).isEqualTo(0);
            readBuffers.add(requestedBuffers.poll());
        }));
        this.factory.allReaders.add(reader);
        this.fileDataManager.registerNewSubpartition(0, (HsSubpartitionViewInternalOperations)this.subpartitionViewOperation);
        this.ioExecutor.trigger();
        Assertions.assertThat((int)this.bufferPool.getAvailableBuffers()).isEqualTo(1);
    }

    @Test
    void testScheduleReadersOrdered() throws Exception {
        TestingHsSubpartitionFileReader reader1 = new TestingHsSubpartitionFileReader();
        TestingHsSubpartitionFileReader reader2 = new TestingHsSubpartitionFileReader();
        CompletableFuture readBuffersFinished1 = new CompletableFuture();
        CompletableFuture readBuffersFinished2 = new CompletableFuture();
        reader1.setReadBuffersConsumer((BiConsumerWithException<Queue<MemorySegment>, Queue<MemorySegment>, IOException>)((BiConsumerWithException)(requestedBuffers, readBuffers) -> {
            Assertions.assertThat((CompletableFuture)readBuffersFinished2).isNotDone();
            readBuffersFinished1.complete(null);
        }));
        reader2.setReadBuffersConsumer((BiConsumerWithException<Queue<MemorySegment>, Queue<MemorySegment>, IOException>)((BiConsumerWithException)(requestedBuffers, readBuffers) -> {
            Assertions.assertThat((CompletableFuture)readBuffersFinished1).isDone();
            readBuffersFinished2.complete(null);
        }));
        reader1.setPriority(1);
        reader2.setPriority(2);
        this.factory.allReaders.add(reader1);
        this.factory.allReaders.add(reader2);
        this.fileDataManager.registerNewSubpartition(0, (HsSubpartitionViewInternalOperations)this.subpartitionViewOperation);
        this.fileDataManager.registerNewSubpartition(1, (HsSubpartitionViewInternalOperations)this.subpartitionViewOperation);
        this.ioExecutor.trigger();
        Assertions.assertThat(readBuffersFinished2).isCompleted();
    }

    @Test
    void testRunRequestBufferTimeout() throws Exception {
        Duration bufferRequestTimeout = Duration.ofSeconds(3L);
        this.fileDataManager = new HsFileDataManager(this.bufferPool, (ScheduledExecutorService)this.ioExecutor, (HsFileDataIndex)new HsFileDataIndexImpl(10), this.dataFilePath, (HsSubpartitionFileReader.Factory)this.factory, HybridShuffleConfiguration.builder((int)10, (int)this.bufferPool.getNumBuffersPerRequest()).setBufferRequestTimeout(bufferRequestTimeout).build());
        TestingHsSubpartitionFileReader reader = new TestingHsSubpartitionFileReader();
        CompletableFuture prepareForSchedulingFinished = new CompletableFuture();
        CompletableFuture cause = new CompletableFuture();
        reader.setPrepareForSchedulingRunnable(() -> prepareForSchedulingFinished.complete(null));
        reader.setFailConsumer(cause::complete);
        reader.setReadBuffersConsumer((BiConsumerWithException<Queue<MemorySegment>, Queue<MemorySegment>, IOException>)((BiConsumerWithException)(requestedBuffers, ignore) -> {
            Assertions.assertThat((Collection)requestedBuffers).hasSize(this.bufferPool.getNumTotalBuffers());
            requestedBuffers.clear();
        }));
        this.factory.allReaders.add(reader);
        this.fileDataManager.registerNewSubpartition(0, (HsSubpartitionViewInternalOperations)this.subpartitionViewOperation);
        this.ioExecutor.trigger();
        Assertions.assertThat((int)this.bufferPool.getAvailableBuffers()).isZero();
        this.fileDataManager.run();
        Assertions.assertThat(prepareForSchedulingFinished).isCompleted();
        Assertions.assertThat(cause).isCompleted();
        ((AbstractThrowableAssert)Assertions.assertThat((Throwable)((Throwable)cause.get())).isInstanceOf(TimeoutException.class)).hasMessageContaining("Buffer request timeout");
    }

    @Test
    void testRunReadBuffersThrowException() throws Exception {
        TestingHsSubpartitionFileReader reader = new TestingHsSubpartitionFileReader();
        CompletableFuture cause = new CompletableFuture();
        reader.setFailConsumer(cause::complete);
        reader.setReadBuffersConsumer((BiConsumerWithException<Queue<MemorySegment>, Queue<MemorySegment>, IOException>)((BiConsumerWithException)(requestedBuffers, readBuffers) -> {
            throw new IOException("expected exception.");
        }));
        this.factory.allReaders.add(reader);
        this.fileDataManager.registerNewSubpartition(0, (HsSubpartitionViewInternalOperations)this.subpartitionViewOperation);
        this.ioExecutor.trigger();
        Assertions.assertThat(cause).isCompleted();
        ((AbstractThrowableAssert)Assertions.assertThat((Throwable)((Throwable)cause.get())).isInstanceOf(IOException.class)).hasMessageContaining("expected exception.");
    }

    @Test
    @Timeout(value=10L)
    void testReleasedWhenReading() throws Exception {
        TestingHsSubpartitionFileReader reader = new TestingHsSubpartitionFileReader();
        CompletableFuture cause = new CompletableFuture();
        reader.setFailConsumer(cause::complete);
        final CompletableFuture readBufferStart = new CompletableFuture();
        final CompletableFuture releasedFinish = new CompletableFuture();
        reader.setReadBuffersConsumer((BiConsumerWithException<Queue<MemorySegment>, Queue<MemorySegment>, IOException>)((BiConsumerWithException)(requestedBuffers, readBuffers) -> {
            try {
                readBufferStart.complete(null);
                releasedFinish.get();
            }
            catch (Exception e) {
                throw new IOException(e);
            }
        }));
        this.factory.allReaders.add(reader);
        CheckedThread releaseThread = new CheckedThread(){

            public void go() throws Exception {
                readBufferStart.get();
                HsFileDataManagerTest.this.fileDataManager.release();
                releasedFinish.complete(null);
            }
        };
        releaseThread.start();
        this.fileDataManager.registerNewSubpartition(0, (HsSubpartitionViewInternalOperations)this.subpartitionViewOperation);
        this.ioExecutor.trigger();
        releaseThread.sync();
        Assertions.assertThat(cause).isCompleted();
        ((AbstractThrowableAssert)Assertions.assertThat((Throwable)((Throwable)cause.get())).isInstanceOf(IllegalStateException.class)).hasMessageContaining("Result partition has been already released.");
    }

    @Test
    void testRegisterSubpartitionReaderAfterReleased() {
        TestingHsSubpartitionFileReader reader = new TestingHsSubpartitionFileReader();
        this.factory.allReaders.add(reader);
        this.fileDataManager.release();
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> {
            this.fileDataManager.registerNewSubpartition(0, (HsSubpartitionViewInternalOperations)this.subpartitionViewOperation);
            this.ioExecutor.trigger();
        }).isInstanceOf(IllegalStateException.class)).hasMessageContaining("HsFileDataManager is already released.");
    }

    @Test
    void testConsumeWhileReleaseNoDeadlock() throws Exception {
        final CompletableFuture consumerStart = new CompletableFuture();
        final CompletableFuture readerFail = new CompletableFuture();
        final HsSubpartitionView subpartitionView = new HsSubpartitionView((BufferAvailabilityListener)new NoOpBufferAvailablityListener());
        HsSubpartitionFileReaderImpl subpartitionFileReader = new HsSubpartitionFileReaderImpl(0, this.dataFileChannel, (HsSubpartitionViewInternalOperations)subpartitionView, (HsFileDataIndex)new HsFileDataIndexImpl(10), 5, arg_0 -> ((HsFileDataManager)this.fileDataManager).releaseSubpartitionReader(arg_0), BufferReaderWriterUtil.allocatedHeaderBuffer()){

            public synchronized void fail(Throwable failureCause) {
                try {
                    readerFail.complete(null);
                    consumerStart.get();
                    super.fail(failureCause);
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        };
        this.factory.allReaders.add(subpartitionFileReader);
        HsDataView diskDataView = this.fileDataManager.registerNewSubpartition(0, (HsSubpartitionViewInternalOperations)subpartitionView);
        subpartitionView.setDiskDataView(diskDataView);
        TestingHsDataView memoryDataView = TestingHsDataView.builder().setConsumeBufferFunction((FunctionWithException<Integer, Optional<ResultSubpartition.BufferAndBacklog>, Throwable>)((FunctionWithException)ignore -> {
            throw new RuntimeException("expected exception.");
        })).build();
        subpartitionView.setMemoryDataView((HsDataView)memoryDataView);
        CheckedThread consumerThread = new CheckedThread(){

            public void go() throws Exception {
                readerFail.get();
                consumerStart.complete(null);
                subpartitionView.getNextBuffer();
            }
        };
        consumerThread.start();
        this.fileDataManager.release();
        consumerThread.sync();
    }

    private static FileChannel openFileChannel(Path path) throws IOException {
        return FileChannel.open(path, StandardOpenOption.READ);
    }

    private static class TestingHsSubpartitionFileReader
    implements HsSubpartitionFileReader {
        private Runnable prepareForSchedulingRunnable = () -> {};
        private BiConsumerWithException<Queue<MemorySegment>, Queue<MemorySegment>, IOException> readBuffersConsumer = (ignore1, ignore2) -> {};
        private Consumer<Throwable> failConsumer = ignore -> {};
        private Runnable releaseDataViewRunnable = () -> {};
        private final Queue<MemorySegment> readBuffers = new ArrayDeque<MemorySegment>();
        private int priority;

        private TestingHsSubpartitionFileReader() {
        }

        public void prepareForScheduling() {
            this.prepareForSchedulingRunnable.run();
        }

        public void readBuffers(Queue<MemorySegment> buffers, BufferRecycler recycler) throws IOException {
            this.readBuffersConsumer.accept(buffers, this.readBuffers);
        }

        public void fail(Throwable failureCause) {
            this.failConsumer.accept(failureCause);
        }

        public int compareTo(HsSubpartitionFileReader that) {
            Preconditions.checkArgument((boolean)(that instanceof TestingHsSubpartitionFileReader));
            return Integer.compare(this.priority, ((TestingHsSubpartitionFileReader)that).priority);
        }

        public void setPriority(int priority) {
            this.priority = priority;
        }

        public void setPrepareForSchedulingRunnable(Runnable prepareForSchedulingRunnable) {
            this.prepareForSchedulingRunnable = prepareForSchedulingRunnable;
        }

        public void setReadBuffersConsumer(BiConsumerWithException<Queue<MemorySegment>, Queue<MemorySegment>, IOException> readBuffersConsumer) {
            this.readBuffersConsumer = readBuffersConsumer;
        }

        public void setFailConsumer(Consumer<Throwable> failConsumer) {
            this.failConsumer = failConsumer;
        }

        public void setReleaseDataViewRunnable(Runnable releaseDataViewRunnable) {
            this.releaseDataViewRunnable = releaseDataViewRunnable;
        }

        public Optional<ResultSubpartition.BufferAndBacklog> consumeBuffer(int nextBufferToConsume) {
            return Optional.empty();
        }

        public Buffer.DataType peekNextToConsumeDataType(int nextBufferToConsume) {
            return Buffer.DataType.NONE;
        }

        public int getBacklog() {
            return 0;
        }

        public void releaseDataView() {
            this.releaseDataViewRunnable.run();
        }

        private static class Factory
        implements HsSubpartitionFileReader.Factory {
            private final Queue<HsSubpartitionFileReader> allReaders = new ArrayDeque<HsSubpartitionFileReader>();

            private Factory() {
            }

            public HsSubpartitionFileReader createFileReader(int subpartitionId, FileChannel dataFileChannel, HsSubpartitionViewInternalOperations operation, HsFileDataIndex dataIndex, int maxBuffersReadAhead, Consumer<HsSubpartitionFileReader> fileReaderReleaser, ByteBuffer headerBuffer) {
                return (HsSubpartitionFileReader)Preconditions.checkNotNull((Object)this.allReaders.poll());
            }
        }
    }
}

