/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.CompositeBuffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
import org.apache.flink.runtime.io.network.partition.CountingAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.PartitionedFile;
import org.apache.flink.runtime.io.network.partition.PartitionedFileReader;
import org.apache.flink.runtime.io.network.partition.PartitionedFileWriteReadTest;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.SortMergeSubpartitionReader;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

class SortMergeSubpartitionReaderTest {
    private static final int bufferSize = 1024;
    private static final byte[] dataBytes = new byte[1024];
    private static final int numSubpartitions = 10;
    private static final int numBuffersPerSubpartition = 10;
    private PartitionedFile partitionedFile;
    private FileChannel dataFileChannel;
    private FileChannel indexFileChannel;

    SortMergeSubpartitionReaderTest() {
    }

    @BeforeEach
    void before(@TempDir Path basePath) throws Exception {
        Random random = new Random();
        random.nextBytes(dataBytes);
        this.partitionedFile = PartitionTestUtils.createPartitionedFile(basePath.toString(), 10, 10, 1024, dataBytes);
        this.dataFileChannel = SortMergeSubpartitionReaderTest.openFileChannel(this.partitionedFile.getDataFilePath());
        this.indexFileChannel = SortMergeSubpartitionReaderTest.openFileChannel(this.partitionedFile.getIndexFilePath());
    }

    @AfterEach
    void after() {
        IOUtils.closeAllQuietly((AutoCloseable[])new AutoCloseable[]{this.dataFileChannel, this.indexFileChannel});
        this.partitionedFile.deleteQuietly();
    }

    @Test
    void testReadBuffers() throws Exception {
        CountingAvailabilityListener listener = new CountingAvailabilityListener();
        SortMergeSubpartitionReader subpartitionReader = this.createSortMergeSubpartitionReader(listener);
        Assertions.assertThat((int)listener.numNotifications).isZero();
        Assertions.assertThat((int)subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isZero();
        Queue<MemorySegment> segments = SortMergeSubpartitionReaderTest.createsMemorySegments(2);
        subpartitionReader.readBuffers(segments, FreeingBufferRecycler.INSTANCE);
        Assertions.assertThat((int)listener.numNotifications).isEqualTo(1);
        Assertions.assertThat((int)subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(1);
        Assertions.assertThat(segments).isEmpty();
        segments = SortMergeSubpartitionReaderTest.createsMemorySegments(2);
        subpartitionReader.readBuffers(segments, FreeingBufferRecycler.INSTANCE);
        Assertions.assertThat((int)listener.numNotifications).isEqualTo(1);
        Assertions.assertThat((int)subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(2);
        Assertions.assertThat(segments).isEmpty();
        while (subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers() > 0) {
            ((ResultSubpartition.BufferAndBacklog)Preconditions.checkNotNull((Object)subpartitionReader.getNextBuffer())).buffer().recycleBuffer();
        }
        segments = SortMergeSubpartitionReaderTest.createsMemorySegments(10);
        subpartitionReader.readBuffers(segments, FreeingBufferRecycler.INSTANCE);
        Assertions.assertThat((int)listener.numNotifications).isEqualTo(2);
        Assertions.assertThat((int)subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(8);
        Assertions.assertThat((int)segments.size()).isEqualTo(1);
    }

    @Test
    void testPollBuffers() throws Exception {
        SortMergeSubpartitionReader subpartitionReader = this.createSortMergeSubpartitionReader(new CountingAvailabilityListener());
        Assertions.assertThat((Object)subpartitionReader.getNextBuffer()).isNull();
        Assertions.assertThat((boolean)subpartitionReader.getAvailabilityAndBacklog(Integer.MAX_VALUE).isAvailable()).isFalse();
        Queue<MemorySegment> segments = SortMergeSubpartitionReaderTest.createsMemorySegments(10);
        subpartitionReader.readBuffers(segments, FreeingBufferRecycler.INSTANCE);
        for (int i = 9; i >= 0; --i) {
            if (!subpartitionReader.getAvailabilityAndBacklog(i).isAvailable()) continue;
            ResultSubpartition.BufferAndBacklog bufferAndBacklog = (ResultSubpartition.BufferAndBacklog)Preconditions.checkNotNull((Object)subpartitionReader.getNextBuffer());
            int numBytes = bufferAndBacklog.buffer().readableBytes();
            MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment((int)numBytes);
            Buffer fullBuffer = ((CompositeBuffer)bufferAndBacklog.buffer()).getFullBufferData(segment);
            Assertions.assertThat((Comparable)ByteBuffer.wrap(dataBytes)).isEqualTo((Object)fullBuffer.getNioBufferReadable());
            Assertions.assertThat((int)bufferAndBacklog.buffersInBacklog()).isEqualTo(i == 0 ? 0 : i - 1);
            Buffer.DataType dataType = i <= 1 ? Buffer.DataType.NONE : Buffer.DataType.DATA_BUFFER;
            Assertions.assertThat((Comparable)dataType).isEqualTo((Object)bufferAndBacklog.getNextDataType());
            fullBuffer.recycleBuffer();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testFail() throws Exception {
        int numSegments = 5;
        Queue<MemorySegment> segments = SortMergeSubpartitionReaderTest.createsMemorySegments(numSegments);
        try {
            CountingAvailabilityListener listener = new CountingAvailabilityListener();
            SortMergeSubpartitionReader subpartitionReader = this.createSortMergeSubpartitionReader(listener);
            subpartitionReader.readBuffers(segments, segments::add);
            Assertions.assertThat((int)listener.numNotifications).isEqualTo(1);
            Assertions.assertThat((int)subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(4);
            subpartitionReader.fail((Throwable)new RuntimeException("Test exception."));
            Assertions.assertThat((CompletableFuture)subpartitionReader.getReleaseFuture()).isDone();
            Assertions.assertThat((int)subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isZero();
            Assertions.assertThat((boolean)subpartitionReader.getAvailabilityAndBacklog(0).isAvailable()).isTrue();
            Assertions.assertThat((boolean)subpartitionReader.isReleased()).isTrue();
            Assertions.assertThat((int)listener.numNotifications).isEqualTo(2);
            Assertions.assertThat((Throwable)subpartitionReader.getFailureCause()).isNotNull();
        }
        finally {
            Assertions.assertThat(segments).hasSize(numSegments);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testReleaseAllResources() throws Exception {
        int numSegments = 5;
        Queue<MemorySegment> segments = SortMergeSubpartitionReaderTest.createsMemorySegments(numSegments);
        try {
            CountingAvailabilityListener listener = new CountingAvailabilityListener();
            SortMergeSubpartitionReader subpartitionReader = this.createSortMergeSubpartitionReader(listener);
            subpartitionReader.readBuffers(segments, segments::add);
            Assertions.assertThat((int)listener.numNotifications).isEqualTo(1);
            Assertions.assertThat((int)subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(4);
            subpartitionReader.releaseAllResources();
            Assertions.assertThat((CompletableFuture)subpartitionReader.getReleaseFuture()).isDone();
            Assertions.assertThat((int)subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isZero();
            Assertions.assertThat((boolean)subpartitionReader.getAvailabilityAndBacklog(0).isAvailable()).isTrue();
            Assertions.assertThat((boolean)subpartitionReader.isReleased()).isTrue();
            Assertions.assertThat((int)listener.numNotifications).isEqualTo(1);
            Assertions.assertThat((Throwable)subpartitionReader.getFailureCause()).isNull();
        }
        finally {
            Assertions.assertThat(segments).hasSize(numSegments);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testReadBuffersAfterReleased() throws Exception {
        int numSegments = 5;
        Queue<MemorySegment> segments = SortMergeSubpartitionReaderTest.createsMemorySegments(numSegments);
        try {
            SortMergeSubpartitionReader subpartitionReader = this.createSortMergeSubpartitionReader(new CountingAvailabilityListener());
            subpartitionReader.readBuffers(segments, segments::add);
            subpartitionReader.releaseAllResources();
            Assertions.assertThatThrownBy(() -> subpartitionReader.readBuffers(segments, segments::add)).isInstanceOf(IllegalStateException.class);
        }
        finally {
            Assertions.assertThat(segments).hasSize(numSegments);
        }
    }

    @Test
    void testPollBuffersAfterReleased() throws Exception {
        SortMergeSubpartitionReader subpartitionReader = this.createSortMergeSubpartitionReader(new CountingAvailabilityListener());
        Queue<MemorySegment> segments = SortMergeSubpartitionReaderTest.createsMemorySegments(10);
        subpartitionReader.readBuffers(segments, FreeingBufferRecycler.INSTANCE);
        Assertions.assertThat((boolean)subpartitionReader.getAvailabilityAndBacklog(Integer.MAX_VALUE).isAvailable()).isTrue();
        subpartitionReader.releaseAllResources();
        Assertions.assertThat((Object)subpartitionReader.getNextBuffer()).isNull();
    }

    private SortMergeSubpartitionReader createSortMergeSubpartitionReader(BufferAvailabilityListener listener) throws Exception {
        PartitionedFileReader fileReader = new PartitionedFileReader(this.partitionedFile, 0, this.dataFileChannel, this.indexFileChannel, BufferReaderWriterUtil.allocatedHeaderBuffer(), PartitionedFileWriteReadTest.createAndConfigIndexEntryBuffer());
        Assertions.assertThat((boolean)fileReader.hasRemaining()).isTrue();
        return new SortMergeSubpartitionReader(listener, fileReader);
    }

    private static FileChannel openFileChannel(Path path) throws IOException {
        return FileChannel.open(path, StandardOpenOption.READ);
    }

    private static Queue<MemorySegment> createsMemorySegments(int numSegments) {
        ArrayDeque<MemorySegment> segments = new ArrayDeque<MemorySegment>();
        for (int i = 0; i < numSegments; ++i) {
            segments.add(MemorySegmentFactory.allocateUnpooledSegment((int)1024));
        }
        return segments;
    }
}

