package org.apache.flink.runtime.io.network.partition.hybrid;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.Queue;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndex;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSubpartitionFileReaderImpl;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.TestLoggerExtension;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@ExtendWith({TestLoggerExtension.class})
/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.class */
class HsSubpartitionFileReaderImplTest {
    private static final int bufferSize = 4;
    private static final int targetChannel = 0;
    private static final int MAX_BUFFERS_READ_AHEAD = 5;
    private Random random;
    private HsFileDataIndex diskIndex;
    private TestingSubpartitionConsumerInternalOperation subpartitionOperation;
    private FileChannel dataFileChannel;
    private Path indexFilePath;
    private long currentFileOffset;

    HsSubpartitionFileReaderImplTest() {
    }

    @BeforeEach
    void before(@TempDir Path path) throws Exception {
        this.random = new Random();
        Path createFile = Files.createFile(path.resolve(UUID.randomUUID().toString()), new FileAttribute[0]);
        this.indexFilePath = path.resolve(UUID.randomUUID().toString());
        this.dataFileChannel = openFileChannel(createFile);
        this.diskIndex = createDataIndex(1, this.indexFilePath);
        this.subpartitionOperation = new TestingSubpartitionConsumerInternalOperation();
        this.currentFileOffset = 0L;
    }

    @AfterEach
    void after() {
        IOUtils.closeQuietly(this.dataFileChannel);
    }

    @Test
    void testReadBuffer(@TempDir Path path) throws Exception {
        this.diskIndex = createDataIndex(2, path.resolve(".index"));
        TestingSubpartitionConsumerInternalOperation testingSubpartitionConsumerInternalOperation = new TestingSubpartitionConsumerInternalOperation();
        TestingSubpartitionConsumerInternalOperation testingSubpartitionConsumerInternalOperation2 = new TestingSubpartitionConsumerInternalOperation();
        HsSubpartitionFileReaderImpl createSubpartitionFileReader = createSubpartitionFileReader(0, testingSubpartitionConsumerInternalOperation);
        HsSubpartitionFileReaderImpl createSubpartitionFileReader2 = createSubpartitionFileReader(1, testingSubpartitionConsumerInternalOperation2);
        writeDataToFile(0, 0, 10, 2);
        writeDataToFile(1, 0, 20, 2);
        writeDataToFile(0, 2, 15, 1);
        writeDataToFile(1, 2, 25, 1);
        Queue<MemorySegment> createsMemorySegments = createsMemorySegments(6);
        createSubpartitionFileReader.prepareForScheduling();
        createSubpartitionFileReader.readBuffers(createsMemorySegments, FreeingBufferRecycler.INSTANCE);
        Assertions.assertThat(createsMemorySegments).hasSize(4);
        checkData(createSubpartitionFileReader, 10, 11);
        createSubpartitionFileReader2.prepareForScheduling();
        createSubpartitionFileReader2.readBuffers(createsMemorySegments, FreeingBufferRecycler.INSTANCE);
        Assertions.assertThat(createsMemorySegments).hasSize(2);
        checkData(createSubpartitionFileReader2, 20, 21);
        createSubpartitionFileReader.prepareForScheduling();
        createSubpartitionFileReader.readBuffers(createsMemorySegments, FreeingBufferRecycler.INSTANCE);
        Assertions.assertThat(createsMemorySegments).hasSize(1);
        checkData(createSubpartitionFileReader, 15);
        createSubpartitionFileReader2.readBuffers(createsMemorySegments, FreeingBufferRecycler.INSTANCE);
        Assertions.assertThat(createsMemorySegments).isEmpty();
        checkData(createSubpartitionFileReader2, 25);
    }

    @ValueSource(strings = {"LZ4", "LZO", "ZSTD"})
    @ParameterizedTest
    void testReadBufferCompressed(String str, @TempDir Path path) throws Exception {
        BufferCompressor bufferCompressor = new BufferCompressor(4, str);
        BufferDecompressor bufferDecompressor = new BufferDecompressor(4, str);
        this.diskIndex = createDataIndex(1, path.resolve(".index"));
        HsSubpartitionFileReaderImpl createSubpartitionFileReader = createSubpartitionFileReader(0, new TestingSubpartitionConsumerInternalOperation());
        writeDataToFile(0, 0, 1, 3, bufferCompressor);
        Queue<MemorySegment> createsMemorySegments = createsMemorySegments(3);
        createSubpartitionFileReader.prepareForScheduling();
        createSubpartitionFileReader.readBuffers(createsMemorySegments, FreeingBufferRecycler.INSTANCE);
        checkData(createSubpartitionFileReader, bufferDecompressor, 1, 2, 3);
    }

    @Test
    void testReadEmptyRegion() throws Exception {
        HsSubpartitionFileReaderImpl createSubpartitionFileReader = createSubpartitionFileReader();
        Deque loadedBuffers = createSubpartitionFileReader.getLoadedBuffers();
        Queue<MemorySegment> createsMemorySegments = createsMemorySegments(2);
        createSubpartitionFileReader.readBuffers(createsMemorySegments, FreeingBufferRecycler.INSTANCE);
        Assertions.assertThat(createsMemorySegments).hasSize(2);
        Assertions.assertThat(loadedBuffers).isEmpty();
    }

    @Test
    void testReadBufferSkip() throws Exception {
        HsSubpartitionFileReaderImpl createSubpartitionFileReader = createSubpartitionFileReader();
        Deque loadedBuffers = createSubpartitionFileReader.getLoadedBuffers();
        writeDataToFile(0, 0, 6);
        this.subpartitionOperation.advanceConsumptionProgress();
        this.subpartitionOperation.advanceConsumptionProgress();
        Assertions.assertThat(this.subpartitionOperation.getConsumingOffset(true)).isEqualTo(1);
        createSubpartitionFileReader.prepareForScheduling();
        Queue<MemorySegment> createsMemorySegments = createsMemorySegments(1);
        createSubpartitionFileReader.readBuffers(createsMemorySegments, FreeingBufferRecycler.INSTANCE);
        Assertions.assertThat(createsMemorySegments).isEmpty();
        Assertions.assertThat(loadedBuffers).hasSize(1);
        HsSubpartitionFileReaderImpl.BufferIndexOrError bufferIndexOrError = (HsSubpartitionFileReaderImpl.BufferIndexOrError) loadedBuffers.poll();
        Assertions.assertThat(bufferIndexOrError).isNotNull();
        Assertions.assertThat(bufferIndexOrError.getBuffer()).isPresent();
        Assertions.assertThat(bufferIndexOrError.getThrowable()).isNotPresent();
        Assertions.assertThat(bufferIndexOrError.getIndex()).isEqualTo(2);
        this.subpartitionOperation.advanceConsumptionProgress();
        this.subpartitionOperation.advanceConsumptionProgress();
        createSubpartitionFileReader.prepareForScheduling();
        Queue<MemorySegment> createsMemorySegments2 = createsMemorySegments(1);
        createSubpartitionFileReader.readBuffers(createsMemorySegments2, FreeingBufferRecycler.INSTANCE);
        Assertions.assertThat(createsMemorySegments2).isEmpty();
        Assertions.assertThat(loadedBuffers).hasSize(1);
        HsSubpartitionFileReaderImpl.BufferIndexOrError bufferIndexOrError2 = (HsSubpartitionFileReaderImpl.BufferIndexOrError) loadedBuffers.poll();
        Assertions.assertThat(bufferIndexOrError2).isNotNull();
        Assertions.assertThat(bufferIndexOrError2.getBuffer()).isPresent();
        Assertions.assertThat(bufferIndexOrError2.getThrowable()).isNotPresent();
        Assertions.assertThat(bufferIndexOrError2.getIndex()).isEqualTo(4);
    }

    @Test
    void testReadBufferNotBeyondRegionBoundary() throws Exception {
        HsSubpartitionFileReaderImpl createSubpartitionFileReader = createSubpartitionFileReader();
        Deque loadedBuffers = createSubpartitionFileReader.getLoadedBuffers();
        writeDataToFile(0, 0, 0, 2);
        writeDataToFile(0, 2, 2, 2);
        createSubpartitionFileReader.prepareForScheduling();
        Queue<MemorySegment> createsMemorySegments = createsMemorySegments(4);
        createSubpartitionFileReader.readBuffers(createsMemorySegments, FreeingBufferRecycler.INSTANCE);
        Assertions.assertThat(loadedBuffers).hasSize(2);
        checkData(createSubpartitionFileReader, 0, 1);
        Assertions.assertThat(createsMemorySegments).hasSize(2);
    }

    @Test
    void testReadBufferNotExceedThreshold() throws Exception {
        HsSubpartitionFileReaderImpl createSubpartitionFileReader = createSubpartitionFileReader();
        Deque loadedBuffers = createSubpartitionFileReader.getLoadedBuffers();
        writeDataToFile(0, 0, 6);
        createSubpartitionFileReader.prepareForScheduling();
        Queue<MemorySegment> createsMemorySegments = createsMemorySegments(6);
        createSubpartitionFileReader.readBuffers(createsMemorySegments, FreeingBufferRecycler.INSTANCE);
        Assertions.assertThat(loadedBuffers).hasSize(MAX_BUFFERS_READ_AHEAD);
        Assertions.assertThat(createsMemorySegments).hasSize(1);
    }

    @Test
    void testReadBufferNotifyDataAvailable() throws Exception {
        final OneShotLatch oneShotLatch = new OneShotLatch();
        TestingSubpartitionConsumerInternalOperation testingSubpartitionConsumerInternalOperation = this.subpartitionOperation;
        oneShotLatch.getClass();
        testingSubpartitionConsumerInternalOperation.setNotifyDataAvailableRunnable(oneShotLatch::trigger);
        HsSubpartitionFileReaderImpl createSubpartitionFileReader = createSubpartitionFileReader();
        final BlockingDeque blockingDeque = (BlockingDeque) createSubpartitionFileReader.getLoadedBuffers();
        Queue<MemorySegment> createsMemorySegments = createsMemorySegments(MAX_BUFFERS_READ_AHEAD);
        writeDataToFile(0, 0, MAX_BUFFERS_READ_AHEAD);
        createSubpartitionFileReader.prepareForScheduling();
        CheckedThread checkedThread = new CheckedThread() { // from class: org.apache.flink.runtime.io.network.partition.hybrid.HsSubpartitionFileReaderImplTest.1
            public void go() throws Exception {
                int i = 0;
                while (i < HsSubpartitionFileReaderImplTest.MAX_BUFFERS_READ_AHEAD) {
                    HsSubpartitionFileReaderImpl.BufferIndexOrError bufferIndexOrError = (HsSubpartitionFileReaderImpl.BufferIndexOrError) blockingDeque.poll();
                    if (bufferIndexOrError != null) {
                        Assertions.assertThat(bufferIndexOrError.getBuffer()).isPresent();
                        i++;
                    } else {
                        oneShotLatch.await();
                        oneShotLatch.reset();
                    }
                }
            }
        };
        checkedThread.start();
        createSubpartitionFileReader.readBuffers(createsMemorySegments, FreeingBufferRecycler.INSTANCE);
        checkedThread.sync();
        Assertions.assertThat(blockingDeque).isEmpty();
    }

    @Test
    void testReadWillReturnBufferAfterError() throws Exception {
        HsSubpartitionFileReaderImpl createSubpartitionFileReader = createSubpartitionFileReader();
        writeDataToFile(0, 0, 2);
        createSubpartitionFileReader.prepareForScheduling();
        Queue<MemorySegment> createsMemorySegments = createsMemorySegments(2);
        this.dataFileChannel.close();
        Assertions.assertThatThrownBy(() -> {
            createSubpartitionFileReader.readBuffers(createsMemorySegments, FreeingBufferRecycler.INSTANCE);
        }).isInstanceOf(IOException.class);
        Assertions.assertThat(createsMemorySegments).hasSize(2);
    }

    @Test
    void testReadBufferAfterFail() {
        HsSubpartitionFileReaderImpl createSubpartitionFileReader = createSubpartitionFileReader();
        createSubpartitionFileReader.fail(new RuntimeException("expected exception."));
        Assertions.assertThatThrownBy(() -> {
            createSubpartitionFileReader.readBuffers(createsMemorySegments(2), FreeingBufferRecycler.INSTANCE);
        }).isInstanceOf(IOException.class).hasMessageContaining("subpartition reader has already failed.");
    }

    @Test
    void testFail() throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        TestingSubpartitionConsumerInternalOperation testingSubpartitionConsumerInternalOperation = this.subpartitionOperation;
        atomicInteger.getClass();
        testingSubpartitionConsumerInternalOperation.setNotifyDataAvailableRunnable(atomicInteger::incrementAndGet);
        HsSubpartitionFileReaderImpl createSubpartitionFileReader = createSubpartitionFileReader();
        Deque loadedBuffers = createSubpartitionFileReader.getLoadedBuffers();
        writeDataToFile(0, 0, 2);
        createSubpartitionFileReader.prepareForScheduling();
        Queue<MemorySegment> createsMemorySegments = createsMemorySegments(2);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        createSubpartitionFileReader.readBuffers(createsMemorySegments, memorySegment -> {
            atomicInteger2.incrementAndGet();
        });
        Assertions.assertThat(createsMemorySegments).isEmpty();
        Assertions.assertThat(loadedBuffers).hasSize(2);
        Assertions.assertThat(atomicInteger).hasValue(1);
        createSubpartitionFileReader.fail(new RuntimeException("expected exception."));
        Assertions.assertThat(atomicInteger2).hasValue(2);
        HsSubpartitionFileReaderImpl.BufferIndexOrError bufferIndexOrError = (HsSubpartitionFileReaderImpl.BufferIndexOrError) loadedBuffers.poll();
        Assertions.assertThat(loadedBuffers).isEmpty();
        Assertions.assertThat(bufferIndexOrError).isNotNull();
        Assertions.assertThat(bufferIndexOrError.getThrowable()).hasValueSatisfying(th -> {
            Assertions.assertThat(th).isInstanceOf(RuntimeException.class).hasMessage("expected exception.");
        });
        Assertions.assertThat(atomicInteger).hasValue(2);
    }

    @Test
    void testCompareTo(@TempDir Path path) throws Exception {
        this.diskIndex = createDataIndex(2, path.resolve(".index"));
        TestingSubpartitionConsumerInternalOperation testingSubpartitionConsumerInternalOperation = new TestingSubpartitionConsumerInternalOperation();
        HsSubpartitionConsumerInternalOperations testingSubpartitionConsumerInternalOperation2 = new TestingSubpartitionConsumerInternalOperation();
        HsSubpartitionFileReaderImpl createSubpartitionFileReader = createSubpartitionFileReader(0, testingSubpartitionConsumerInternalOperation);
        HsSubpartitionFileReaderImpl createSubpartitionFileReader2 = createSubpartitionFileReader(1, testingSubpartitionConsumerInternalOperation2);
        Assertions.assertThat(createSubpartitionFileReader).isEqualByComparingTo(createSubpartitionFileReader2);
        writeDataToFile(0, 0, 2);
        writeDataToFile(1, 0, 1);
        writeDataToFile(0, 2, 1);
        Assertions.assertThat(createSubpartitionFileReader).isLessThan(createSubpartitionFileReader2);
        testingSubpartitionConsumerInternalOperation.advanceConsumptionProgress();
        createSubpartitionFileReader.prepareForScheduling();
        Assertions.assertThat(createSubpartitionFileReader).isLessThan(createSubpartitionFileReader2);
        testingSubpartitionConsumerInternalOperation.advanceConsumptionProgress();
        createSubpartitionFileReader.prepareForScheduling();
        Assertions.assertThat(createSubpartitionFileReader).isGreaterThan(createSubpartitionFileReader2);
    }

    @Test
    void testRecycleBuffersForConsumeBuffer() throws Throwable {
        HsSubpartitionFileReaderImpl createSubpartitionFileReader = createSubpartitionFileReader(0, new TestingSubpartitionConsumerInternalOperation());
        writeDataToFile(0, 0, 0, 4);
        Queue<MemorySegment> createsMemorySegments = createsMemorySegments(4);
        createSubpartitionFileReader.prepareForScheduling();
        createSubpartitionFileReader.readBuffers(createsMemorySegments, memorySegment -> {
        });
        ArrayList arrayList = new ArrayList();
        createSubpartitionFileReader.consumeBuffer(2, arrayList);
        Assertions.assertThat(arrayList).hasSize(2);
        Assertions.assertThat(arrayList).element(0).satisfies(new ThrowingConsumer[]{buffer -> {
            assertBufferContentEqualTo(buffer, 0);
        }});
        Assertions.assertThat(arrayList).element(1).satisfies(new ThrowingConsumer[]{buffer2 -> {
            assertBufferContentEqualTo(buffer2, 1);
        }});
    }

    @Test
    void testRecycleBuffersForPeekNextToConsumeDataType() throws Throwable {
        HsSubpartitionFileReaderImpl createSubpartitionFileReader = createSubpartitionFileReader(0, new TestingSubpartitionConsumerInternalOperation());
        writeDataToFile(0, 0, 0, 4);
        Queue<MemorySegment> createsMemorySegments = createsMemorySegments(4);
        createSubpartitionFileReader.prepareForScheduling();
        createSubpartitionFileReader.readBuffers(createsMemorySegments, memorySegment -> {
        });
        ArrayList arrayList = new ArrayList();
        createSubpartitionFileReader.peekNextToConsumeDataType(2, arrayList);
        Assertions.assertThat(arrayList).hasSize(2);
        Assertions.assertThat(arrayList).element(0).satisfies(new ThrowingConsumer[]{buffer -> {
            assertBufferContentEqualTo(buffer, 0);
        }});
        Assertions.assertThat(arrayList).element(1).satisfies(new ThrowingConsumer[]{buffer2 -> {
            assertBufferContentEqualTo(buffer2, 1);
        }});
    }

    @Test
    void testConsumeBuffer() throws Throwable {
        HsSubpartitionFileReaderImpl createSubpartitionFileReader = createSubpartitionFileReader(0, new TestingSubpartitionConsumerInternalOperation());
        Assertions.assertThat(createSubpartitionFileReader.consumeBuffer(0, Collections.emptyList())).isNotPresent();
        writeDataToFile(0, 0, 0, 3);
        Queue<MemorySegment> createsMemorySegments = createsMemorySegments(3);
        createSubpartitionFileReader.prepareForScheduling();
        createSubpartitionFileReader.readBuffers(createsMemorySegments, memorySegment -> {
        });
        Assertions.assertThat(createSubpartitionFileReader.consumeBuffer(0, new ArrayList())).hasValueSatisfying(bufferAndBacklog -> {
            Assertions.assertThat(bufferAndBacklog.getNextDataType()).isEqualTo(Buffer.DataType.DATA_BUFFER);
            Assertions.assertThat(bufferAndBacklog.getSequenceNumber()).isEqualTo(0);
            Assertions.assertThat(bufferAndBacklog.buffer().getNioBufferReadable().order(ByteOrder.nativeOrder()).getInt()).isEqualTo(0);
        });
        Assertions.assertThat(createSubpartitionFileReader.consumeBuffer(0, Collections.emptyList())).isNotPresent();
        Assertions.assertThat(createSubpartitionFileReader.consumeBuffer(2, new ArrayList())).hasValueSatisfying(bufferAndBacklog2 -> {
            Assertions.assertThat(bufferAndBacklog2.getNextDataType()).isEqualTo(Buffer.DataType.NONE);
            Assertions.assertThat(bufferAndBacklog2.getSequenceNumber()).isEqualTo(2);
            Assertions.assertThat(bufferAndBacklog2.buffer().getNioBufferReadable().order(ByteOrder.nativeOrder()).getInt()).isEqualTo(2);
        });
        Assertions.assertThat(createSubpartitionFileReader.getLoadedBuffers()).isEmpty();
    }

    @Test
    void testPeekNextToConsumeDataTypeOrConsumeBufferThrowException() {
        HsSubpartitionFileReaderImpl createSubpartitionFileReader = createSubpartitionFileReader(0, new TestingSubpartitionConsumerInternalOperation());
        createSubpartitionFileReader.fail(new RuntimeException("expected exception."));
        Assertions.assertThatThrownBy(() -> {
            createSubpartitionFileReader.peekNextToConsumeDataType(0, Collections.emptyList());
        }).isInstanceOf(RuntimeException.class).hasMessageContaining("expected exception.");
        Assertions.assertThatThrownBy(() -> {
            createSubpartitionFileReader.consumeBuffer(0, Collections.emptyList());
        }).isInstanceOf(RuntimeException.class).hasMessageContaining("expected exception.");
    }

    @Test
    void testPeekNextToConsumeDataType() throws Throwable {
        HsSubpartitionFileReaderImpl createSubpartitionFileReader = createSubpartitionFileReader(0, new TestingSubpartitionConsumerInternalOperation());
        Assertions.assertThat(createSubpartitionFileReader.peekNextToConsumeDataType(0, new ArrayList())).isEqualTo(Buffer.DataType.NONE);
        writeDataToFile(0, 0, 3);
        Queue<MemorySegment> createsMemorySegments = createsMemorySegments(3);
        createSubpartitionFileReader.prepareForScheduling();
        createSubpartitionFileReader.readBuffers(createsMemorySegments, memorySegment -> {
        });
        Assertions.assertThat(createSubpartitionFileReader.peekNextToConsumeDataType(0, Collections.emptyList())).isEqualTo(Buffer.DataType.DATA_BUFFER);
        Assertions.assertThat(createSubpartitionFileReader.peekNextToConsumeDataType(2, new ArrayList())).isEqualTo(Buffer.DataType.EVENT_BUFFER);
        Assertions.assertThat(createSubpartitionFileReader.peekNextToConsumeDataType(1, Collections.emptyList())).isEqualTo(Buffer.DataType.NONE);
    }

    @Test
    void testSubpartitionReaderRegisterMultipleTimes() throws Exception {
        TestingSubpartitionConsumerInternalOperation testingSubpartitionConsumerInternalOperation = new TestingSubpartitionConsumerInternalOperation();
        HsSubpartitionFileReaderImpl createSubpartitionFileReader = createSubpartitionFileReader(0, testingSubpartitionConsumerInternalOperation);
        testingSubpartitionConsumerInternalOperation.advanceConsumptionProgress();
        writeDataToFile(0, 0, 1, 3);
        createSubpartitionFileReader.prepareForScheduling();
        Queue<MemorySegment> createsMemorySegments = createsMemorySegments(3);
        createSubpartitionFileReader.readBuffers(createsMemorySegments, memorySegment -> {
        });
        Assertions.assertThat(createsMemorySegments).hasSize(1);
        checkData(createSubpartitionFileReader, 2, 3);
        HsSubpartitionFileReaderImpl createSubpartitionFileReader2 = createSubpartitionFileReader(0, new TestingSubpartitionConsumerInternalOperation());
        createSubpartitionFileReader2.prepareForScheduling();
        Queue<MemorySegment> createsMemorySegments2 = createsMemorySegments(3);
        createSubpartitionFileReader2.readBuffers(createsMemorySegments2, memorySegment2 -> {
        });
        Assertions.assertThat(createsMemorySegments2).isEmpty();
        checkData(createSubpartitionFileReader2, 1, 2, 3);
    }

    @Test
    void testMultipleFileReaderOfSingleSubpartition() throws Exception {
        TestingSubpartitionConsumerInternalOperation testingSubpartitionConsumerInternalOperation = new TestingSubpartitionConsumerInternalOperation();
        TestingSubpartitionConsumerInternalOperation testingSubpartitionConsumerInternalOperation2 = new TestingSubpartitionConsumerInternalOperation();
        HsConsumerId newId = HsConsumerId.newId((HsConsumerId) null);
        HsSubpartitionFileReaderImpl createSubpartitionFileReader = createSubpartitionFileReader(0, newId, testingSubpartitionConsumerInternalOperation);
        HsSubpartitionFileReaderImpl createSubpartitionFileReader2 = createSubpartitionFileReader(0, HsConsumerId.newId(newId), testingSubpartitionConsumerInternalOperation2);
        Assertions.assertThat(createSubpartitionFileReader).isNotEqualTo(createSubpartitionFileReader2);
        writeDataToFile(0, 0, 1, 3);
        createSubpartitionFileReader.prepareForScheduling();
        Queue<MemorySegment> createsMemorySegments = createsMemorySegments(3);
        createSubpartitionFileReader.readBuffers(createsMemorySegments, memorySegment -> {
        });
        Assertions.assertThat(createsMemorySegments).isEmpty();
        checkData(createSubpartitionFileReader, 1, 2, 3);
        createSubpartitionFileReader2.prepareForScheduling();
        Queue<MemorySegment> createsMemorySegments2 = createsMemorySegments(3);
        createSubpartitionFileReader2.readBuffers(createsMemorySegments2, memorySegment2 -> {
        });
        Assertions.assertThat(createsMemorySegments2).isEmpty();
        checkData(createSubpartitionFileReader2, 1, 2, 3);
    }

    @Test
    void testReadBuffersAfterReleased() throws Throwable {
        TestingSubpartitionConsumerInternalOperation testingSubpartitionConsumerInternalOperation = new TestingSubpartitionConsumerInternalOperation();
        CompletableFuture completableFuture = new CompletableFuture();
        HsSubpartitionFileReaderImpl createSubpartitionFileReader = createSubpartitionFileReader(0, HsConsumerId.DEFAULT, testingSubpartitionConsumerInternalOperation, hsSubpartitionFileReader -> {
            completableFuture.complete(null);
        });
        writeDataToFile(0, 0, 4);
        createSubpartitionFileReader.prepareForScheduling();
        Queue<MemorySegment> createsMemorySegments = createsMemorySegments(2);
        ArrayDeque arrayDeque = new ArrayDeque();
        arrayDeque.getClass();
        createSubpartitionFileReader.readBuffers(createsMemorySegments, (v1) -> {
            r2.add(v1);
        });
        Assertions.assertThat(createsMemorySegments).isEmpty();
        createSubpartitionFileReader.releaseDataView();
        Assertions.assertThat(createSubpartitionFileReader.getLoadedBuffers()).isEmpty();
        Assertions.assertThat(arrayDeque).hasSize(2);
        Queue<MemorySegment> createsMemorySegments2 = createsMemorySegments(2);
        arrayDeque.getClass();
        createSubpartitionFileReader.readBuffers(createsMemorySegments2, (v1) -> {
            r2.add(v1);
        });
        Assertions.assertThat(createsMemorySegments2).hasSize(2);
        Assertions.assertThat(arrayDeque).hasSize(2);
        Assertions.assertThat(completableFuture).isCompleted();
    }

    private static void checkData(HsSubpartitionFileReaderImpl hsSubpartitionFileReaderImpl, BufferDecompressor bufferDecompressor, int... iArr) {
        Assertions.assertThat(hsSubpartitionFileReaderImpl.getLoadedBuffers()).hasSameSizeAs(iArr);
        for (int i : iArr) {
            HsSubpartitionFileReaderImpl.BufferIndexOrError bufferIndexOrError = (HsSubpartitionFileReaderImpl.BufferIndexOrError) hsSubpartitionFileReaderImpl.getLoadedBuffers().poll();
            Assertions.assertThat(bufferIndexOrError).isNotNull();
            Assertions.assertThat(bufferIndexOrError.getBuffer()).isPresent();
            Buffer buffer = (Buffer) bufferIndexOrError.getBuffer().get();
            Assertions.assertThat(((!buffer.isCompressed() || bufferDecompressor == null) ? buffer : bufferDecompressor.decompressToIntermediateBuffer(buffer)).getNioBufferReadable().order(ByteOrder.nativeOrder()).getInt()).isEqualTo(i);
        }
    }

    private static void checkData(HsSubpartitionFileReaderImpl hsSubpartitionFileReaderImpl, int... iArr) {
        checkData(hsSubpartitionFileReaderImpl, null, iArr);
    }

    private HsSubpartitionFileReaderImpl createSubpartitionFileReader() {
        return createSubpartitionFileReader(0, this.subpartitionOperation);
    }

    private HsSubpartitionFileReaderImpl createSubpartitionFileReader(int i, HsSubpartitionConsumerInternalOperations hsSubpartitionConsumerInternalOperations) {
        return createSubpartitionFileReader(i, HsConsumerId.DEFAULT, hsSubpartitionConsumerInternalOperations);
    }

    private HsSubpartitionFileReaderImpl createSubpartitionFileReader(int i, HsConsumerId hsConsumerId, HsSubpartitionConsumerInternalOperations hsSubpartitionConsumerInternalOperations) {
        return createSubpartitionFileReader(i, hsConsumerId, hsSubpartitionConsumerInternalOperations, hsSubpartitionFileReader -> {
        });
    }

    private HsSubpartitionFileReaderImpl createSubpartitionFileReader(int i, HsConsumerId hsConsumerId, HsSubpartitionConsumerInternalOperations hsSubpartitionConsumerInternalOperations, Consumer<HsSubpartitionFileReader> consumer) {
        return new HsSubpartitionFileReaderImpl(i, hsConsumerId, this.dataFileChannel, hsSubpartitionConsumerInternalOperations, this.diskIndex, MAX_BUFFERS_READ_AHEAD, consumer, BufferReaderWriterUtil.allocatedHeaderBuffer());
    }

    private HsFileDataIndexImpl createDataIndex(int i, Path path) {
        return new HsFileDataIndexImpl(i, path, 256, Long.MAX_VALUE);
    }

    private static FileChannel openFileChannel(Path path) throws IOException {
        return FileChannel.open(path, StandardOpenOption.READ, StandardOpenOption.WRITE);
    }

    private static Queue<MemorySegment> createsMemorySegments(int i) {
        ArrayDeque arrayDeque = new ArrayDeque();
        for (int i2 = 0; i2 < i; i2++) {
            arrayDeque.add(MemorySegmentFactory.allocateUnpooledSegment(4));
        }
        return arrayDeque;
    }

    private void writeDataToFile(int i, int i2, int i3, int i4, BufferCompressor bufferCompressor) throws Exception {
        ArrayList arrayList = new ArrayList(i4);
        ByteBuffer[] byteBufferArr = new ByteBuffer[2 * i4];
        int i5 = 0;
        int i6 = 0;
        while (i6 < i4) {
            Buffer.DataType dataType = i6 == i4 - 1 ? Buffer.DataType.EVENT_BUFFER : Buffer.DataType.DATA_BUFFER;
            MemorySegment allocateUnpooledSegment = MemorySegmentFactory.allocateUnpooledSegment(4);
            allocateUnpooledSegment.putInt(0, i3 + i6);
            Buffer networkBuffer = new NetworkBuffer(allocateUnpooledSegment, FreeingBufferRecycler.INSTANCE, dataType, 4);
            if (bufferCompressor != null && networkBuffer.isBuffer()) {
                networkBuffer = bufferCompressor.compressToOriginalBuffer(networkBuffer);
            }
            setBufferWithHeader(networkBuffer, byteBufferArr, 2 * i6);
            arrayList.add(new HsFileDataIndex.SpilledBuffer(i, i2 + i6, this.currentFileOffset + i5));
            i5 += networkBuffer.getSize() + 8;
            i6++;
        }
        BufferReaderWriterUtil.writeBuffers(this.dataFileChannel, i5, byteBufferArr);
        this.currentFileOffset += i5;
        this.diskIndex.addBuffers(arrayList);
        arrayList.forEach(spilledBuffer -> {
            this.diskIndex.markBufferReleased(i, spilledBuffer.bufferIndex);
        });
    }

    private void writeDataToFile(int i, int i2, int i3, int i4) throws Exception {
        writeDataToFile(i, i2, i3, i4, null);
    }

    private void writeDataToFile(int i, int i2, int i3) throws Exception {
        writeDataToFile(i, i2, this.random.nextInt(), i3);
    }

    private static void setBufferWithHeader(Buffer buffer, ByteBuffer[] byteBufferArr, int i) {
        ByteBuffer allocatedHeaderBuffer = BufferReaderWriterUtil.allocatedHeaderBuffer();
        BufferReaderWriterUtil.setByteChannelBufferHeader(buffer, allocatedHeaderBuffer);
        byteBufferArr[i] = allocatedHeaderBuffer;
        byteBufferArr[i + 1] = buffer.getNioBufferReadable();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void assertBufferContentEqualTo(Buffer buffer, int i) {
        Assertions.assertThat(buffer.getNioBufferReadable().order(ByteOrder.nativeOrder()).getInt()).isEqualTo(i);
    }
}
