/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.hybrid;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
import org.apache.flink.runtime.io.network.partition.hybrid.BufferWithIdentity;
import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndex;
import org.apache.flink.runtime.io.network.partition.hybrid.HsMemoryDataSpiller;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

class HsMemoryDataSpillerTest {
    private static final int BUFFER_SIZE = 4;
    private static final long BUFFER_WITH_HEADER_SIZE = 12L;
    private HsMemoryDataSpiller memoryDataSpiller;
    @TempDir
    private Path tempDir;
    private Path dataFilePath;

    HsMemoryDataSpillerTest() {
    }

    @BeforeEach
    void before() {
        this.dataFilePath = this.tempDir.resolve(".data");
    }

    @ParameterizedTest
    @ValueSource(booleans={false, true})
    void testSpillSuccessfully(boolean isCompressed) throws Exception {
        this.memoryDataSpiller = HsMemoryDataSpillerTest.createMemoryDataSpiller(this.dataFilePath);
        ArrayList<BufferWithIdentity> bufferWithIdentityList = new ArrayList<BufferWithIdentity>();
        bufferWithIdentityList.addAll(HsMemoryDataSpillerTest.createBufferWithIdentityList(isCompressed, 0, Arrays.asList(Tuple2.of((Object)0, (Object)0), Tuple2.of((Object)1, (Object)1), Tuple2.of((Object)2, (Object)2))));
        bufferWithIdentityList.addAll(HsMemoryDataSpillerTest.createBufferWithIdentityList(isCompressed, 0, Arrays.asList(Tuple2.of((Object)4, (Object)0), Tuple2.of((Object)5, (Object)1), Tuple2.of((Object)6, (Object)2))));
        CompletableFuture future = this.memoryDataSpiller.spillAsync(bufferWithIdentityList);
        List<HsFileDataIndex.SpilledBuffer> expectedSpilledBuffers = HsMemoryDataSpillerTest.getExpectedSpilledBuffers(bufferWithIdentityList);
        FlinkAssertions.assertThatFuture((CompletableFuture)future).eventuallySucceeds().satisfies(new ThrowingConsumer[]{spilledBuffers -> {
            ListAssert cfr_ignored_0 = (ListAssert)Assertions.assertThat((List)spilledBuffers).zipSatisfy((Iterable)expectedSpilledBuffers, (spilledBuffer, expectedSpilledBuffer) -> {
                Assertions.assertThat((int)spilledBuffer.bufferIndex).isEqualTo(expectedSpilledBuffer.bufferIndex);
                Assertions.assertThat((int)spilledBuffer.subpartitionId).isEqualTo(expectedSpilledBuffer.subpartitionId);
                Assertions.assertThat((long)spilledBuffer.fileOffset).isEqualTo(expectedSpilledBuffer.fileOffset);
            });
        }});
        this.checkData(isCompressed, Arrays.asList(Tuple2.of((Object)0, (Object)0), Tuple2.of((Object)1, (Object)1), Tuple2.of((Object)2, (Object)2), Tuple2.of((Object)4, (Object)0), Tuple2.of((Object)5, (Object)1), Tuple2.of((Object)6, (Object)2)));
    }

    @Test
    void testClose() throws Exception {
        this.memoryDataSpiller = HsMemoryDataSpillerTest.createMemoryDataSpiller(this.dataFilePath);
        ArrayList<BufferWithIdentity> bufferWithIdentityList = new ArrayList<BufferWithIdentity>(HsMemoryDataSpillerTest.createBufferWithIdentityList(false, 0, Arrays.asList(Tuple2.of((Object)0, (Object)0), Tuple2.of((Object)1, (Object)1), Tuple2.of((Object)2, (Object)2))));
        this.memoryDataSpiller.spillAsync(bufferWithIdentityList);
        this.memoryDataSpiller.close();
        this.checkData(false, Arrays.asList(Tuple2.of((Object)0, (Object)0), Tuple2.of((Object)1, (Object)1), Tuple2.of((Object)2, (Object)2)));
        Assertions.assertThatThrownBy(() -> this.memoryDataSpiller.spillAsync(bufferWithIdentityList)).isInstanceOf(RejectedExecutionException.class);
    }

    private static List<BufferWithIdentity> createBufferWithIdentityList(boolean isCompressed, int subpartitionId, List<Tuple2<Integer, Integer>> dataAndIndexes) {
        ArrayList<BufferWithIdentity> bufferWithIdentityList = new ArrayList<BufferWithIdentity>();
        for (Tuple2<Integer, Integer> dataAndIndex : dataAndIndexes) {
            Buffer.DataType dataType = (Integer)dataAndIndex.f1 % 2 == 0 ? Buffer.DataType.EVENT_BUFFER : Buffer.DataType.DATA_BUFFER;
            MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment((int)4);
            segment.putInt(0, ((Integer)dataAndIndex.f0).intValue());
            NetworkBuffer buffer = new NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE, dataType, 4);
            if (isCompressed) {
                buffer.setCompressed(true);
            }
            bufferWithIdentityList.add(new BufferWithIdentity((Buffer)buffer, ((Integer)dataAndIndex.f1).intValue(), subpartitionId));
        }
        return Collections.unmodifiableList(bufferWithIdentityList);
    }

    private static List<HsFileDataIndex.SpilledBuffer> getExpectedSpilledBuffers(List<BufferWithIdentity> bufferWithIdentityList) {
        long totalBytes = 0L;
        ArrayList<HsFileDataIndex.SpilledBuffer> spilledBuffers = new ArrayList<HsFileDataIndex.SpilledBuffer>();
        for (BufferWithIdentity bufferWithIdentity : bufferWithIdentityList) {
            spilledBuffers.add(new HsFileDataIndex.SpilledBuffer(bufferWithIdentity.getChannelIndex(), bufferWithIdentity.getBufferIndex(), totalBytes));
            totalBytes += 12L;
        }
        return Collections.unmodifiableList(spilledBuffers);
    }

    private void checkData(boolean isCompressed, List<Tuple2<Integer, Integer>> dataAndIndexes) throws Exception {
        FileChannel readChannel = FileChannel.open(this.dataFilePath, StandardOpenOption.READ);
        ByteBuffer headerBuf = BufferReaderWriterUtil.allocatedHeaderBuffer();
        MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment((int)4);
        for (Tuple2<Integer, Integer> dataAndIndex : dataAndIndexes) {
            Buffer buffer = BufferReaderWriterUtil.readFromByteChannel((FileChannel)readChannel, (ByteBuffer)headerBuf, (MemorySegment)segment, ignore -> {});
            Assertions.assertThat((boolean)buffer.isCompressed()).isEqualTo(isCompressed);
            Assertions.assertThat((int)buffer.readableBytes()).isEqualTo(4);
            Assertions.assertThat((int)buffer.getNioBufferReadable().order(ByteOrder.nativeOrder()).getInt()).isEqualTo(dataAndIndex.f0);
            Assertions.assertThat((boolean)buffer.getDataType().isEvent()).isEqualTo((Integer)dataAndIndex.f1 % 2 == 0);
        }
    }

    private static HsMemoryDataSpiller createMemoryDataSpiller(Path dataFilePath) throws Exception {
        return new HsMemoryDataSpiller(dataFilePath);
    }
}

