package org.apache.flink.runtime.io.disk;

import java.io.EOFException;
import java.util.ArrayList;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.ListMemorySegmentSource;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.memory.MemoryManagerBuilder;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.operators.testutils.TestData;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/io/disk/SpillingBufferTest.class */
class SpillingBufferTest {
    private static final long SEED = 649180756312423613L;
    private static final int KEY_MAX = Integer.MAX_VALUE;
    private static final int VALUE_LENGTH = 114;
    private static final int NUM_PAIRS_INMEM = 6000;
    private static final int NUM_PAIRS_EXTERNAL = 30000;
    private static final int MEMORY_SIZE = 1048576;
    private static final int NUM_MEMORY_SEGMENTS = 23;
    private final AbstractInvokable parentTask = new DummyInvokable();
    private IOManager ioManager;
    private MemoryManager memoryManager;

    SpillingBufferTest() {
    }

    @BeforeEach
    void beforeTest() {
        this.memoryManager = MemoryManagerBuilder.newBuilder().setMemorySize(1048576L).build();
        this.ioManager = new IOManagerAsync();
    }

    @AfterEach
    void afterTest() throws Exception {
        this.ioManager.close();
        if (this.memoryManager != null) {
            Assertions.assertThat(this.memoryManager.verifyEmpty()).withFailMessage("Memory leak: not all segments have been returned to the memory manager.", new Object[0]).isTrue();
            this.memoryManager.shutdown();
            this.memoryManager = null;
        }
    }

    @Test
    void testWriteReadInMemory() throws Exception {
        TestData.TupleGenerator tupleGenerator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, TestData.TupleGenerator.KeyMode.RANDOM, TestData.TupleGenerator.ValueMode.RANDOM_LENGTH);
        TypeSerializer<Tuple2<Integer, String>> intStringTupleSerializer = TestData.getIntStringTupleSerializer();
        ArrayList arrayList = new ArrayList(NUM_MEMORY_SEGMENTS);
        this.memoryManager.allocatePages(this.parentTask, arrayList, NUM_MEMORY_SEGMENTS);
        SpillingBuffer spillingBuffer = new SpillingBuffer(this.ioManager, new ListMemorySegmentSource(arrayList), this.memoryManager.getPageSize());
        Tuple2<Integer, String> tuple2 = new Tuple2<>();
        for (int i = 0; i < NUM_PAIRS_INMEM; i++) {
            tupleGenerator.next(tuple2);
            intStringTupleSerializer.serialize(tuple2, spillingBuffer);
        }
        DataInputView flip = spillingBuffer.flip();
        tupleGenerator.reset();
        Tuple2 tuple22 = new Tuple2();
        for (int i2 = 0; i2 < NUM_PAIRS_INMEM; i2++) {
            tupleGenerator.next(tuple2);
            intStringTupleSerializer.deserialize(tuple22, flip);
            Assertions.assertThat(((Integer) tuple2.f0).intValue() == ((Integer) tuple22.f0).intValue() && ((String) tuple2.f1).equals((String) tuple22.f1)).withFailMessage("The re-generated and the notifyNonEmpty record do not match.", new Object[0]).isTrue();
        }
        DataInputView flip2 = spillingBuffer.flip();
        tupleGenerator.reset();
        for (int i3 = 0; i3 < NUM_PAIRS_INMEM; i3++) {
            tupleGenerator.next(tuple2);
            intStringTupleSerializer.deserialize(tuple22, flip2);
            Assertions.assertThat(((Integer) tuple2.f0).intValue() == ((Integer) tuple22.f0).intValue() && ((String) tuple2.f1).equals((String) tuple22.f1)).withFailMessage("The re-generated and the notifyNonEmpty record do not match.", new Object[0]).isTrue();
        }
        this.memoryManager.release(spillingBuffer.close());
        this.memoryManager.release(arrayList);
    }

    @Test
    void testWriteReadTooMuchInMemory() throws Exception {
        TestData.TupleGenerator tupleGenerator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, TestData.TupleGenerator.KeyMode.RANDOM, TestData.TupleGenerator.ValueMode.RANDOM_LENGTH);
        TypeSerializer<Tuple2<Integer, String>> intStringTupleSerializer = TestData.getIntStringTupleSerializer();
        ArrayList arrayList = new ArrayList(NUM_MEMORY_SEGMENTS);
        this.memoryManager.allocatePages(this.parentTask, arrayList, NUM_MEMORY_SEGMENTS);
        SpillingBuffer spillingBuffer = new SpillingBuffer(this.ioManager, new ListMemorySegmentSource(arrayList), this.memoryManager.getPageSize());
        Tuple2<Integer, String> tuple2 = new Tuple2<>();
        for (int i = 0; i < NUM_PAIRS_INMEM; i++) {
            tupleGenerator.next(tuple2);
            intStringTupleSerializer.serialize(tuple2, spillingBuffer);
        }
        DataInputView flip = spillingBuffer.flip();
        tupleGenerator.reset();
        Tuple2 tuple22 = new Tuple2();
        for (int i2 = 0; i2 < NUM_PAIRS_INMEM; i2++) {
            tupleGenerator.next(tuple2);
            intStringTupleSerializer.deserialize(tuple22, flip);
            Assertions.assertThat(((Integer) tuple2.f0).intValue() == ((Integer) tuple22.f0).intValue() && ((String) tuple2.f1).equals((String) tuple22.f1)).withFailMessage("The re-generated and the notifyNonEmpty record do not match.", new Object[0]).isTrue();
        }
        tupleGenerator.next(tuple2);
        Assertions.assertThatThrownBy(() -> {
        }).withFailMessage("Read too much, expected EOFException.", new Object[0]).isInstanceOf(EOFException.class);
        DataInputView flip2 = spillingBuffer.flip();
        tupleGenerator.reset();
        for (int i3 = 0; i3 < NUM_PAIRS_INMEM; i3++) {
            tupleGenerator.next(tuple2);
            intStringTupleSerializer.deserialize(tuple22, flip2);
            Assertions.assertThat(((Integer) tuple2.f0).intValue() == ((Integer) tuple22.f0).intValue() && ((String) tuple2.f1).equals((String) tuple22.f1)).withFailMessage("The re-generated and the notifyNonEmpty record do not match.", new Object[0]).isTrue();
        }
        this.memoryManager.release(spillingBuffer.close());
        this.memoryManager.release(arrayList);
    }

    @Test
    void testWriteReadExternal() throws Exception {
        TestData.TupleGenerator tupleGenerator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, TestData.TupleGenerator.KeyMode.RANDOM, TestData.TupleGenerator.ValueMode.RANDOM_LENGTH);
        TypeSerializer<Tuple2<Integer, String>> intStringTupleSerializer = TestData.getIntStringTupleSerializer();
        ArrayList arrayList = new ArrayList(NUM_MEMORY_SEGMENTS);
        this.memoryManager.allocatePages(this.parentTask, arrayList, NUM_MEMORY_SEGMENTS);
        SpillingBuffer spillingBuffer = new SpillingBuffer(this.ioManager, new ListMemorySegmentSource(arrayList), this.memoryManager.getPageSize());
        Tuple2<Integer, String> tuple2 = new Tuple2<>();
        for (int i = 0; i < NUM_PAIRS_EXTERNAL; i++) {
            tupleGenerator.next(tuple2);
            intStringTupleSerializer.serialize(tuple2, spillingBuffer);
        }
        DataInputView flip = spillingBuffer.flip();
        tupleGenerator.reset();
        Tuple2 tuple22 = new Tuple2();
        for (int i2 = 0; i2 < NUM_PAIRS_EXTERNAL; i2++) {
            tupleGenerator.next(tuple2);
            intStringTupleSerializer.deserialize(tuple22, flip);
            Assertions.assertThat(((Integer) tuple2.f0).intValue() == ((Integer) tuple22.f0).intValue() && ((String) tuple2.f1).equals((String) tuple22.f1)).withFailMessage("The re-generated and the notifyNonEmpty record do not match.", new Object[0]).isTrue();
        }
        DataInputView flip2 = spillingBuffer.flip();
        tupleGenerator.reset();
        for (int i3 = 0; i3 < NUM_PAIRS_EXTERNAL; i3++) {
            tupleGenerator.next(tuple2);
            intStringTupleSerializer.deserialize(tuple22, flip2);
            Assertions.assertThat(((Integer) tuple2.f0).intValue() == ((Integer) tuple22.f0).intValue() && ((String) tuple2.f1).equals((String) tuple22.f1)).withFailMessage("The re-generated and the notifyNonEmpty record do not match.", new Object[0]).isTrue();
        }
        this.memoryManager.release(spillingBuffer.close());
        this.memoryManager.release(arrayList);
    }

    @Test
    void testWriteReadTooMuchExternal() throws Exception {
        TestData.TupleGenerator tupleGenerator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, TestData.TupleGenerator.KeyMode.RANDOM, TestData.TupleGenerator.ValueMode.RANDOM_LENGTH);
        TypeSerializer<Tuple2<Integer, String>> intStringTupleSerializer = TestData.getIntStringTupleSerializer();
        ArrayList arrayList = new ArrayList(NUM_MEMORY_SEGMENTS);
        this.memoryManager.allocatePages(this.parentTask, arrayList, NUM_MEMORY_SEGMENTS);
        SpillingBuffer spillingBuffer = new SpillingBuffer(this.ioManager, new ListMemorySegmentSource(arrayList), this.memoryManager.getPageSize());
        Tuple2<Integer, String> tuple2 = new Tuple2<>();
        for (int i = 0; i < NUM_PAIRS_EXTERNAL; i++) {
            tupleGenerator.next(tuple2);
            intStringTupleSerializer.serialize(tuple2, spillingBuffer);
        }
        DataInputView flip = spillingBuffer.flip();
        tupleGenerator.reset();
        Tuple2 tuple22 = new Tuple2();
        for (int i2 = 0; i2 < NUM_PAIRS_EXTERNAL; i2++) {
            tupleGenerator.next(tuple2);
            intStringTupleSerializer.deserialize(tuple22, flip);
            Assertions.assertThat(((Integer) tuple2.f0).intValue() == ((Integer) tuple22.f0).intValue() && ((String) tuple2.f1).equals((String) tuple22.f1)).withFailMessage("The re-generated and the notifyNonEmpty record do not match.", new Object[0]).isTrue();
        }
        tupleGenerator.next(tuple2);
        Assertions.assertThatThrownBy(() -> {
        }).withFailMessage("Read too much, expected EOFException.", new Object[0]).isInstanceOf(EOFException.class);
        DataInputView flip2 = spillingBuffer.flip();
        tupleGenerator.reset();
        for (int i3 = 0; i3 < NUM_PAIRS_EXTERNAL; i3++) {
            tupleGenerator.next(tuple2);
            intStringTupleSerializer.deserialize(tuple22, flip2);
            Assertions.assertThat(((Integer) tuple2.f0).intValue() == ((Integer) tuple22.f0).intValue() && ((String) tuple2.f1).equals((String) tuple22.f1)).withFailMessage("The re-generated and the notifyNonEmpty record do not match.", new Object[0]).isTrue();
        }
        this.memoryManager.release(spillingBuffer.close());
        this.memoryManager.release(arrayList);
    }
}
