package org.apache.flink.runtime.io.network.partition.hybrid;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.Queue;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndex;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSubpartitionFileReaderImpl;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.TestLoggerExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;

@ExtendWith({TestLoggerExtension.class})
/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.class */
class HsSubpartitionFileReaderImplTest {
    private static final int bufferSize = 4;
    private static final int targetChannel = 0;
    private static final int MAX_BUFFERS_READ_AHEAD = 5;
    private Random random;
    private HsFileDataIndex diskIndex;
    private TestingSubpartitionViewInternalOperation subpartitionOperation;
    private FileChannel dataFileChannel;
    private long currentFileOffset;

    HsSubpartitionFileReaderImplTest() {
    }

    @BeforeEach
    void before(@TempDir Path path) throws Exception {
        this.random = new Random();
        this.dataFileChannel = openFileChannel(Files.createFile(path.resolve(UUID.randomUUID().toString()), new FileAttribute[0]));
        this.diskIndex = new HsFileDataIndexImpl(1);
        this.subpartitionOperation = new TestingSubpartitionViewInternalOperation();
        this.currentFileOffset = 0L;
    }

    @AfterEach
    void after() {
        IOUtils.closeQuietly(this.dataFileChannel);
    }

    @Test
    void testReadBuffer() throws Exception {
        this.diskIndex = new HsFileDataIndexImpl(2);
        TestingSubpartitionViewInternalOperation testingSubpartitionViewInternalOperation = new TestingSubpartitionViewInternalOperation();
        TestingSubpartitionViewInternalOperation testingSubpartitionViewInternalOperation2 = new TestingSubpartitionViewInternalOperation();
        HsSubpartitionFileReaderImpl createSubpartitionFileReader = createSubpartitionFileReader(0, testingSubpartitionViewInternalOperation);
        HsSubpartitionFileReaderImpl createSubpartitionFileReader2 = createSubpartitionFileReader(1, testingSubpartitionViewInternalOperation2);
        writeDataToFile(0, 0, 10, 2);
        writeDataToFile(1, 0, 20, 2);
        writeDataToFile(0, 2, 15, 1);
        writeDataToFile(1, 2, 25, 1);
        Queue<MemorySegment> createsMemorySegments = createsMemorySegments(6);
        createSubpartitionFileReader.readBuffers(createsMemorySegments, FreeingBufferRecycler.INSTANCE);
        Assertions.assertThat(createsMemorySegments).hasSize(4);
        checkData(createSubpartitionFileReader, 10, 11);
        createSubpartitionFileReader2.readBuffers(createsMemorySegments, FreeingBufferRecycler.INSTANCE);
        Assertions.assertThat(createsMemorySegments).hasSize(2);
        checkData(createSubpartitionFileReader2, 20, 21);
        createSubpartitionFileReader.readBuffers(createsMemorySegments, FreeingBufferRecycler.INSTANCE);
        Assertions.assertThat(createsMemorySegments).hasSize(1);
        checkData(createSubpartitionFileReader, 15);
        createSubpartitionFileReader2.readBuffers(createsMemorySegments, FreeingBufferRecycler.INSTANCE);
        Assertions.assertThat(createsMemorySegments).isEmpty();
        checkData(createSubpartitionFileReader2, 25);
    }

    @Test
    void testReadEmptyRegion() throws Exception {
        HsSubpartitionFileReaderImpl createSubpartitionFileReader = createSubpartitionFileReader();
        Deque loadedBuffers = createSubpartitionFileReader.getLoadedBuffers();
        Queue<MemorySegment> createsMemorySegments = createsMemorySegments(2);
        createSubpartitionFileReader.readBuffers(createsMemorySegments, FreeingBufferRecycler.INSTANCE);
        Assertions.assertThat(createsMemorySegments).hasSize(2);
        Assertions.assertThat(loadedBuffers).isEmpty();
    }

    @Test
    void testReadBufferSkip() throws Exception {
        HsSubpartitionFileReaderImpl createSubpartitionFileReader = createSubpartitionFileReader();
        Deque loadedBuffers = createSubpartitionFileReader.getLoadedBuffers();
        writeDataToFile(0, 0, 6);
        this.subpartitionOperation.advanceConsumptionProgress();
        this.subpartitionOperation.advanceConsumptionProgress();
        Assertions.assertThat(this.subpartitionOperation.getConsumingOffset()).isEqualTo(1);
        createSubpartitionFileReader.prepareForScheduling();
        Queue<MemorySegment> createsMemorySegments = createsMemorySegments(1);
        createSubpartitionFileReader.readBuffers(createsMemorySegments, FreeingBufferRecycler.INSTANCE);
        Assertions.assertThat(createsMemorySegments).isEmpty();
        Assertions.assertThat(loadedBuffers).hasSize(1);
        HsSubpartitionFileReaderImpl.BufferIndexOrError bufferIndexOrError = (HsSubpartitionFileReaderImpl.BufferIndexOrError) loadedBuffers.poll();
        Assertions.assertThat(bufferIndexOrError).isNotNull();
        Assertions.assertThat(bufferIndexOrError.getBuffer()).isPresent();
        Assertions.assertThat(bufferIndexOrError.getThrowable()).isNotPresent();
        Assertions.assertThat(bufferIndexOrError.getIndex()).isEqualTo(2);
        this.subpartitionOperation.advanceConsumptionProgress();
        this.subpartitionOperation.advanceConsumptionProgress();
        createSubpartitionFileReader.prepareForScheduling();
        Queue<MemorySegment> createsMemorySegments2 = createsMemorySegments(1);
        createSubpartitionFileReader.readBuffers(createsMemorySegments2, FreeingBufferRecycler.INSTANCE);
        Assertions.assertThat(createsMemorySegments2).isEmpty();
        Assertions.assertThat(loadedBuffers).hasSize(1);
        HsSubpartitionFileReaderImpl.BufferIndexOrError bufferIndexOrError2 = (HsSubpartitionFileReaderImpl.BufferIndexOrError) loadedBuffers.poll();
        Assertions.assertThat(bufferIndexOrError2).isNotNull();
        Assertions.assertThat(bufferIndexOrError2.getBuffer()).isPresent();
        Assertions.assertThat(bufferIndexOrError2.getThrowable()).isNotPresent();
        Assertions.assertThat(bufferIndexOrError2.getIndex()).isEqualTo(4);
    }

    @Test
    void testReadBufferNotBeyondRegionBoundary() throws Exception {
        HsSubpartitionFileReaderImpl createSubpartitionFileReader = createSubpartitionFileReader();
        Deque loadedBuffers = createSubpartitionFileReader.getLoadedBuffers();
        writeDataToFile(0, 0, 0, 2);
        writeDataToFile(0, 2, 2, 2);
        createSubpartitionFileReader.prepareForScheduling();
        Queue<MemorySegment> createsMemorySegments = createsMemorySegments(4);
        createSubpartitionFileReader.readBuffers(createsMemorySegments, FreeingBufferRecycler.INSTANCE);
        Assertions.assertThat(loadedBuffers).hasSize(2);
        checkData(createSubpartitionFileReader, 0, 1);
        Assertions.assertThat(createsMemorySegments).hasSize(2);
    }

    @Test
    void testReadBufferNotExceedThreshold() throws Exception {
        HsSubpartitionFileReaderImpl createSubpartitionFileReader = createSubpartitionFileReader();
        Deque loadedBuffers = createSubpartitionFileReader.getLoadedBuffers();
        writeDataToFile(0, 0, 6);
        createSubpartitionFileReader.prepareForScheduling();
        Queue<MemorySegment> createsMemorySegments = createsMemorySegments(6);
        createSubpartitionFileReader.readBuffers(createsMemorySegments, FreeingBufferRecycler.INSTANCE);
        Assertions.assertThat(loadedBuffers).hasSize(MAX_BUFFERS_READ_AHEAD);
        Assertions.assertThat(createsMemorySegments).hasSize(1);
    }

    @Test
    void testReadBufferNotifyDataAvailable() throws Exception {
        final OneShotLatch oneShotLatch = new OneShotLatch();
        TestingSubpartitionViewInternalOperation testingSubpartitionViewInternalOperation = this.subpartitionOperation;
        oneShotLatch.getClass();
        testingSubpartitionViewInternalOperation.setNotifyDataAvailableRunnable(oneShotLatch::trigger);
        HsSubpartitionFileReaderImpl createSubpartitionFileReader = createSubpartitionFileReader();
        final BlockingDeque blockingDeque = (BlockingDeque) createSubpartitionFileReader.getLoadedBuffers();
        Queue<MemorySegment> createsMemorySegments = createsMemorySegments(MAX_BUFFERS_READ_AHEAD);
        writeDataToFile(0, 0, MAX_BUFFERS_READ_AHEAD);
        createSubpartitionFileReader.prepareForScheduling();
        CheckedThread checkedThread = new CheckedThread() { // from class: org.apache.flink.runtime.io.network.partition.hybrid.HsSubpartitionFileReaderImplTest.1
            public void go() throws Exception {
                int i = 0;
                while (i < HsSubpartitionFileReaderImplTest.MAX_BUFFERS_READ_AHEAD) {
                    HsSubpartitionFileReaderImpl.BufferIndexOrError bufferIndexOrError = (HsSubpartitionFileReaderImpl.BufferIndexOrError) blockingDeque.poll();
                    if (bufferIndexOrError != null) {
                        Assertions.assertThat(bufferIndexOrError.getBuffer()).isPresent();
                        i++;
                    } else {
                        oneShotLatch.await();
                        oneShotLatch.reset();
                    }
                }
            }
        };
        checkedThread.start();
        createSubpartitionFileReader.readBuffers(createsMemorySegments, FreeingBufferRecycler.INSTANCE);
        checkedThread.sync();
        Assertions.assertThat(blockingDeque).isEmpty();
    }

    @Test
    void testReadWillReturnBufferAfterError() throws Exception {
        HsSubpartitionFileReaderImpl createSubpartitionFileReader = createSubpartitionFileReader();
        writeDataToFile(0, 0, 2);
        createSubpartitionFileReader.prepareForScheduling();
        Queue<MemorySegment> createsMemorySegments = createsMemorySegments(2);
        this.dataFileChannel.close();
        Assertions.assertThatThrownBy(() -> {
            createSubpartitionFileReader.readBuffers(createsMemorySegments, FreeingBufferRecycler.INSTANCE);
        }).isInstanceOf(IOException.class);
        Assertions.assertThat(createsMemorySegments).hasSize(2);
    }

    @Test
    void testReadBufferAfterFail() {
        HsSubpartitionFileReaderImpl createSubpartitionFileReader = createSubpartitionFileReader();
        createSubpartitionFileReader.fail(new RuntimeException("expected exception."));
        Assertions.assertThatThrownBy(() -> {
            createSubpartitionFileReader.readBuffers(createsMemorySegments(2), FreeingBufferRecycler.INSTANCE);
        }).isInstanceOf(IOException.class).hasMessageContaining("subpartition reader has already failed.");
    }

    @Test
    void testFail() throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        TestingSubpartitionViewInternalOperation testingSubpartitionViewInternalOperation = this.subpartitionOperation;
        atomicInteger.getClass();
        testingSubpartitionViewInternalOperation.setNotifyDataAvailableRunnable(atomicInteger::incrementAndGet);
        HsSubpartitionFileReaderImpl createSubpartitionFileReader = createSubpartitionFileReader();
        Deque loadedBuffers = createSubpartitionFileReader.getLoadedBuffers();
        writeDataToFile(0, 0, 2);
        createSubpartitionFileReader.prepareForScheduling();
        Queue<MemorySegment> createsMemorySegments = createsMemorySegments(2);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        createSubpartitionFileReader.readBuffers(createsMemorySegments, memorySegment -> {
            atomicInteger2.incrementAndGet();
        });
        Assertions.assertThat(createsMemorySegments).isEmpty();
        Assertions.assertThat(loadedBuffers).hasSize(2);
        Assertions.assertThat(atomicInteger).hasValue(1);
        createSubpartitionFileReader.fail(new RuntimeException("expected exception."));
        Assertions.assertThat(atomicInteger2).hasValue(2);
        HsSubpartitionFileReaderImpl.BufferIndexOrError bufferIndexOrError = (HsSubpartitionFileReaderImpl.BufferIndexOrError) loadedBuffers.poll();
        Assertions.assertThat(loadedBuffers).isEmpty();
        Assertions.assertThat(bufferIndexOrError).isNotNull();
        Assertions.assertThat(bufferIndexOrError.getThrowable()).hasValueSatisfying(th -> {
            Assertions.assertThat(th).isInstanceOf(RuntimeException.class).hasMessage("expected exception.");
        });
        Assertions.assertThat(atomicInteger).hasValue(2);
    }

    @Test
    void testCompareTo() throws Exception {
        this.diskIndex = new HsFileDataIndexImpl(2);
        TestingSubpartitionViewInternalOperation testingSubpartitionViewInternalOperation = new TestingSubpartitionViewInternalOperation();
        HsSubpartitionViewInternalOperations testingSubpartitionViewInternalOperation2 = new TestingSubpartitionViewInternalOperation();
        HsSubpartitionFileReaderImpl createSubpartitionFileReader = createSubpartitionFileReader(0, testingSubpartitionViewInternalOperation);
        HsSubpartitionFileReaderImpl createSubpartitionFileReader2 = createSubpartitionFileReader(1, testingSubpartitionViewInternalOperation2);
        Assertions.assertThat(createSubpartitionFileReader).isEqualByComparingTo(createSubpartitionFileReader2);
        writeDataToFile(0, 0, 2);
        writeDataToFile(1, 0, 1);
        writeDataToFile(0, 2, 1);
        Assertions.assertThat(createSubpartitionFileReader).isLessThan(createSubpartitionFileReader2);
        testingSubpartitionViewInternalOperation.advanceConsumptionProgress();
        createSubpartitionFileReader.prepareForScheduling();
        Assertions.assertThat(createSubpartitionFileReader).isLessThan(createSubpartitionFileReader2);
        testingSubpartitionViewInternalOperation.advanceConsumptionProgress();
        createSubpartitionFileReader.prepareForScheduling();
        Assertions.assertThat(createSubpartitionFileReader).isGreaterThan(createSubpartitionFileReader2);
    }

    private static void checkData(HsSubpartitionFileReaderImpl hsSubpartitionFileReaderImpl, int... iArr) {
        Assertions.assertThat(hsSubpartitionFileReaderImpl.getLoadedBuffers()).hasSameSizeAs(iArr);
        for (int i : iArr) {
            HsSubpartitionFileReaderImpl.BufferIndexOrError bufferIndexOrError = (HsSubpartitionFileReaderImpl.BufferIndexOrError) hsSubpartitionFileReaderImpl.getLoadedBuffers().poll();
            Assertions.assertThat(bufferIndexOrError).isNotNull();
            Assertions.assertThat(bufferIndexOrError.getBuffer()).hasValueSatisfying(buffer -> {
                Assertions.assertThat(buffer.getNioBufferReadable().order(ByteOrder.nativeOrder()).getInt()).isEqualTo(i);
            });
        }
    }

    private HsSubpartitionFileReaderImpl createSubpartitionFileReader() {
        return createSubpartitionFileReader(0, this.subpartitionOperation);
    }

    private HsSubpartitionFileReaderImpl createSubpartitionFileReader(int i, HsSubpartitionViewInternalOperations hsSubpartitionViewInternalOperations) {
        return new HsSubpartitionFileReaderImpl(i, this.dataFileChannel, hsSubpartitionViewInternalOperations, this.diskIndex, MAX_BUFFERS_READ_AHEAD);
    }

    private static FileChannel openFileChannel(Path path) throws IOException {
        return FileChannel.open(path, StandardOpenOption.READ, StandardOpenOption.WRITE);
    }

    private static Queue<MemorySegment> createsMemorySegments(int i) {
        ArrayDeque arrayDeque = new ArrayDeque();
        for (int i2 = 0; i2 < i; i2++) {
            arrayDeque.add(MemorySegmentFactory.allocateUnpooledSegment(4));
        }
        return arrayDeque;
    }

    private void writeDataToFile(int i, int i2, int i3, int i4) throws Exception {
        ArrayList arrayList = new ArrayList(i4);
        ByteBuffer[] byteBufferArr = new ByteBuffer[2 * i4];
        int i5 = 0;
        int i6 = 0;
        while (i6 < i4) {
            Buffer.DataType dataType = i6 == i4 - 1 ? Buffer.DataType.EVENT_BUFFER : Buffer.DataType.DATA_BUFFER;
            MemorySegment allocateUnpooledSegment = MemorySegmentFactory.allocateUnpooledSegment(4);
            allocateUnpooledSegment.putInt(0, i3 + i6);
            setBufferWithHeader(new NetworkBuffer(allocateUnpooledSegment, FreeingBufferRecycler.INSTANCE, dataType, 4), byteBufferArr, 2 * i6);
            arrayList.add(new HsFileDataIndex.SpilledBuffer(i, i2 + i6, this.currentFileOffset + i5));
            i5 += 12;
            i6++;
        }
        BufferReaderWriterUtil.writeBuffers(this.dataFileChannel, i5, byteBufferArr);
        this.currentFileOffset += i5;
        this.diskIndex.addBuffers(arrayList);
        arrayList.forEach(spilledBuffer -> {
            this.diskIndex.markBufferReadable(i, spilledBuffer.bufferIndex);
        });
    }

    private void writeDataToFile(int i, int i2, int i3) throws Exception {
        writeDataToFile(i, i2, this.random.nextInt(), i3);
    }

    private static void setBufferWithHeader(Buffer buffer, ByteBuffer[] byteBufferArr, int i) {
        ByteBuffer allocatedHeaderBuffer = BufferReaderWriterUtil.allocatedHeaderBuffer();
        BufferReaderWriterUtil.setByteChannelBufferHeader(buffer, allocatedHeaderBuffer);
        byteBufferArr[i] = allocatedHeaderBuffer;
        byteBufferArr[i + 1] = buffer.getNioBufferReadable();
    }
}
