package org.apache.flink.runtime.io.network.partition.hybrid;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndex;
import org.apache.flink.util.TestLoggerExtension;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;

@ExtendWith({TestLoggerExtension.class})
/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataSpillerTest.class */
class HsMemoryDataSpillerTest {
    private static final int BUFFER_SIZE = 4;
    private static final long BUFFER_WITH_HEADER_SIZE = 12;
    private FileChannel dataFileChannel;
    private HsMemoryDataSpiller memoryDataSpiller;

    HsMemoryDataSpillerTest() {
    }

    @BeforeEach
    void before(@TempDir Path path) throws Exception {
        this.dataFileChannel = FileChannel.open(Files.createFile(path.resolve(".data"), new FileAttribute[0]), StandardOpenOption.WRITE, StandardOpenOption.READ);
        this.memoryDataSpiller = new HsMemoryDataSpiller(this.dataFileChannel);
    }

    @Test
    void testSpillSuccessfully() throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(createBufferWithIdentityList(0, Arrays.asList(Tuple2.of(0, 0), Tuple2.of(1, 1), Tuple2.of(2, 2))));
        arrayList.addAll(createBufferWithIdentityList(0, Arrays.asList(Tuple2.of(4, 0), Tuple2.of(5, 1), Tuple2.of(6, 2))));
        CompletableFuture spillAsync = this.memoryDataSpiller.spillAsync(arrayList);
        List<HsFileDataIndex.SpilledBuffer> expectedSpilledBuffers = getExpectedSpilledBuffers(arrayList);
        Assertions.assertThat(spillAsync).succeedsWithin(60L, TimeUnit.SECONDS).satisfies(new ThrowingConsumer[]{list -> {
            Assertions.assertThat(list).zipSatisfy(expectedSpilledBuffers, (spilledBuffer, spilledBuffer2) -> {
                Assertions.assertThat(spilledBuffer.bufferIndex).isEqualTo(spilledBuffer2.bufferIndex);
                Assertions.assertThat(spilledBuffer.subpartitionId).isEqualTo(spilledBuffer2.subpartitionId);
                Assertions.assertThat(spilledBuffer.fileOffset).isEqualTo(spilledBuffer2.fileOffset);
            });
        }});
        checkData(Arrays.asList(Tuple2.of(0, 0), Tuple2.of(1, 1), Tuple2.of(2, 2), Tuple2.of(4, 0), Tuple2.of(5, 1), Tuple2.of(6, 2)));
    }

    private static List<BufferWithIdentity> createBufferWithIdentityList(int i, List<Tuple2<Integer, Integer>> list) {
        ArrayList arrayList = new ArrayList();
        for (Tuple2<Integer, Integer> tuple2 : list) {
            Buffer.DataType dataType = ((Integer) tuple2.f1).intValue() % 2 == 0 ? Buffer.DataType.EVENT_BUFFER : Buffer.DataType.DATA_BUFFER;
            MemorySegment allocateUnpooledSegment = MemorySegmentFactory.allocateUnpooledSegment(4);
            allocateUnpooledSegment.putInt(0, ((Integer) tuple2.f0).intValue());
            arrayList.add(new BufferWithIdentity(new NetworkBuffer(allocateUnpooledSegment, FreeingBufferRecycler.INSTANCE, dataType, 4), ((Integer) tuple2.f1).intValue(), i));
        }
        return Collections.unmodifiableList(arrayList);
    }

    private static List<HsFileDataIndex.SpilledBuffer> getExpectedSpilledBuffers(List<BufferWithIdentity> list) {
        long j = 0;
        ArrayList arrayList = new ArrayList();
        for (BufferWithIdentity bufferWithIdentity : list) {
            arrayList.add(new HsFileDataIndex.SpilledBuffer(bufferWithIdentity.getChannelIndex(), bufferWithIdentity.getBufferIndex(), j));
            j += BUFFER_WITH_HEADER_SIZE;
        }
        return Collections.unmodifiableList(arrayList);
    }

    private void checkData(List<Tuple2<Integer, Integer>> list) throws Exception {
        this.dataFileChannel.position(0L);
        ByteBuffer allocatedHeaderBuffer = BufferReaderWriterUtil.allocatedHeaderBuffer();
        MemorySegment allocateUnpooledSegment = MemorySegmentFactory.allocateUnpooledSegment(4);
        for (Tuple2<Integer, Integer> tuple2 : list) {
            Buffer readFromByteChannel = BufferReaderWriterUtil.readFromByteChannel(this.dataFileChannel, allocatedHeaderBuffer, allocateUnpooledSegment, memorySegment -> {
            });
            Assertions.assertThat(readFromByteChannel.readableBytes()).isEqualTo(4);
            Assertions.assertThat(readFromByteChannel.getNioBufferReadable().order(ByteOrder.nativeOrder()).getInt()).isEqualTo(tuple2.f0);
            Assertions.assertThat(readFromByteChannel.getDataType().isEvent()).isEqualTo(((Integer) tuple2.f1).intValue() % 2 == 0);
        }
    }
}
