/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.sort;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.memory.MemoryManagerBuilder;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.writer.BinaryRowWriter;
import org.apache.flink.table.runtime.generated.NormalizedKeyComputer;
import org.apache.flink.table.runtime.generated.RecordComparator;
import org.apache.flink.table.runtime.operators.sort.BinaryExternalSorter;
import org.apache.flink.table.runtime.operators.sort.IntNormalizedKeyComputer;
import org.apache.flink.table.runtime.operators.sort.IntRecordComparator;
import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.util.MutableObjectIterator;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ExtendWith(value={ParameterizedTestExtension.class})
class BinaryExternalSorterTest {
    private static final int MEMORY_SIZE = 0x2000000;
    private static final Logger LOG = LoggerFactory.getLogger(BinaryExternalSorterTest.class);
    private IOManager ioManager = new IOManagerAsync();
    private MemoryManager memoryManager;
    private BinaryRowDataSerializer serializer;
    private Configuration conf = new Configuration();

    public BinaryExternalSorterTest(boolean spillCompress, boolean asyncMerge) {
        if (!spillCompress) {
            this.conf.set(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED, (Object)false);
        }
        if (asyncMerge) {
            this.conf.set(ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED, (Object)true);
        }
    }

    @Parameters(name="spillCompress-{0} asyncMerge-{1}")
    private static Collection<Boolean[]> parameters() {
        return Arrays.asList({false, false}, {false, true}, {true, false}, {true, true});
    }

    private static String getString(int count) {
        StringBuilder builder = new StringBuilder();
        for (int i = 0; i < 8; ++i) {
            builder.append(count);
        }
        return builder.toString();
    }

    @BeforeEach
    void beforeTest() {
        this.memoryManager = MemoryManagerBuilder.newBuilder().setMemorySize(0x2000000L).build();
        this.serializer = new BinaryRowDataSerializer(2);
        this.conf.set(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES, (Object)128);
    }

    @AfterEach
    void afterTest() throws Exception {
        this.ioManager.close();
        if (this.memoryManager != null) {
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)this.memoryManager.verifyEmpty()).as("Memory leak: not all segments have been returned to the memory manager.", new Object[0])).isTrue();
            this.memoryManager.shutdown();
            this.memoryManager = null;
        }
    }

    @TestTemplate
    void testSortTwoBufferInMemory() throws Exception {
        int size = 1000000;
        MockBinaryRowReader reader = new MockBinaryRowReader(size);
        LOG.debug("initializing sortmerger");
        MemoryManager memoryManager = MemoryManagerBuilder.newBuilder().setMemorySize(0x6500000L).build();
        long minMemorySize = memoryManager.computeNumberOfPages(1.0) * 32768;
        BinaryExternalSorter sorter = new BinaryExternalSorter(new Object(), memoryManager, minMemorySize, this.ioManager, (AbstractRowDataSerializer)this.serializer, this.serializer, (NormalizedKeyComputer)IntNormalizedKeyComputer.INSTANCE, (RecordComparator)IntRecordComparator.INSTANCE, ((Integer)this.conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES)).intValue(), ((Boolean)this.conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED)).booleanValue(), (int)((MemorySize)this.conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE)).getBytes(), ((Boolean)this.conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED)).booleanValue(), 1.0f);
        sorter.startThreads();
        sorter.write((MutableObjectIterator)reader);
        MutableObjectIterator iterator = sorter.getIterator();
        BinaryRowData next = this.serializer.createInstance();
        for (int i = 0; i < size; ++i) {
            next = (BinaryRowData)iterator.next((Object)next);
            Assertions.assertThat((int)next.getInt(0)).isEqualTo(i);
            Assertions.assertThat((String)next.getString(1).toString()).isEqualTo(BinaryExternalSorterTest.getString(i));
        }
        sorter.close();
        Assertions.assertThat((boolean)memoryManager.verifyEmpty()).isTrue();
        memoryManager.shutdown();
    }

    @TestTemplate
    void testSort() throws Exception {
        int size = 10000;
        MockBinaryRowReader reader = new MockBinaryRowReader(size);
        LOG.debug("initializing sortmerger");
        long minMemorySize = this.memoryManager.computeNumberOfPages(0.9) * 32768;
        BinaryExternalSorter sorter = new BinaryExternalSorter(new Object(), this.memoryManager, minMemorySize, this.ioManager, (AbstractRowDataSerializer)this.serializer, this.serializer, (NormalizedKeyComputer)IntNormalizedKeyComputer.INSTANCE, (RecordComparator)IntRecordComparator.INSTANCE, ((Integer)this.conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES)).intValue(), ((Boolean)this.conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED)).booleanValue(), (int)((MemorySize)this.conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE)).getBytes(), ((Boolean)this.conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED)).booleanValue(), 0.7f);
        sorter.startThreads();
        sorter.write((MutableObjectIterator)reader);
        MutableObjectIterator iterator = sorter.getIterator();
        BinaryRowData next = this.serializer.createInstance();
        for (int i = 0; i < size; ++i) {
            next = (BinaryRowData)iterator.next((Object)next);
            Assertions.assertThat((int)next.getInt(0)).isEqualTo(i);
            Assertions.assertThat((String)next.getString(1).toString()).isEqualTo(BinaryExternalSorterTest.getString(i));
        }
        sorter.close();
    }

    @TestTemplate
    void testSortIntStringWithRepeat() throws Exception {
        int size = 10000;
        LOG.debug("initializing sortmerger");
        long minMemorySize = this.memoryManager.computeNumberOfPages(0.9) * 32768;
        BinaryExternalSorter sorter = new BinaryExternalSorter(new Object(), this.memoryManager, minMemorySize, this.ioManager, (AbstractRowDataSerializer)this.serializer, this.serializer, (NormalizedKeyComputer)new IntNormalizedKeyComputer(){

            @Override
            public boolean isKeyFullyDetermines() {
                return false;
            }
        }, (RecordComparator)IntRecordComparator.INSTANCE, ((Integer)this.conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES)).intValue(), ((Boolean)this.conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED)).booleanValue(), (int)((MemorySize)this.conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE)).getBytes(), ((Boolean)this.conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED)).booleanValue(), 0.7f);
        sorter.startThreads();
        sorter.write((MutableObjectIterator)new MockBinaryRowReader(size));
        sorter.write((MutableObjectIterator)new MockBinaryRowReader(size));
        sorter.write((MutableObjectIterator)new MockBinaryRowReader(size));
        MutableObjectIterator iterator = sorter.getIterator();
        BinaryRowData next = this.serializer.createInstance();
        for (int i = 0; i < size; ++i) {
            for (int j = 0; j < 3; ++j) {
                next = (BinaryRowData)iterator.next((Object)next);
                Assertions.assertThat((int)next.getInt(0)).isEqualTo(i);
                Assertions.assertThat((String)next.getString(1).toString()).isEqualTo(BinaryExternalSorterTest.getString(i));
            }
        }
        sorter.close();
    }

    @TestTemplate
    void testSpilling() throws Exception {
        int size = 1000000;
        MockBinaryRowReader reader = new MockBinaryRowReader(size);
        LOG.debug("initializing sortmerger");
        long minMemorySize = this.memoryManager.computeNumberOfPages(0.1) * 32768;
        BinaryExternalSorter sorter = new BinaryExternalSorter(new Object(), this.memoryManager, minMemorySize, this.ioManager, (AbstractRowDataSerializer)this.serializer, this.serializer, (NormalizedKeyComputer)IntNormalizedKeyComputer.INSTANCE, (RecordComparator)IntRecordComparator.INSTANCE, ((Integer)this.conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES)).intValue(), ((Boolean)this.conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED)).booleanValue(), (int)((MemorySize)this.conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE)).getBytes(), ((Boolean)this.conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED)).booleanValue(), 0.7f);
        sorter.startThreads();
        sorter.write((MutableObjectIterator)reader);
        MutableObjectIterator iterator = sorter.getIterator();
        BinaryRowData next = this.serializer.createInstance();
        for (int i = 0; i < size; ++i) {
            next = (BinaryRowData)iterator.next((Object)next);
            Assertions.assertThat((int)next.getInt(0)).isEqualTo(i);
            Assertions.assertThat((String)next.getString(1).toString()).isEqualTo(BinaryExternalSorterTest.getString(i));
        }
        sorter.close();
    }

    @TestTemplate
    void testSpillingDesc() throws Exception {
        int size = 1000000;
        MockBinaryRowReader reader = new MockBinaryRowReader(size);
        LOG.debug("initializing sortmerger");
        long minMemorySize = this.memoryManager.computeNumberOfPages(0.1) * 32768;
        BinaryExternalSorter sorter = new BinaryExternalSorter(new Object(), this.memoryManager, minMemorySize, this.ioManager, (AbstractRowDataSerializer)this.serializer, this.serializer, (NormalizedKeyComputer)new IntNormalizedKeyComputer(){

            @Override
            public boolean invertKey() {
                return true;
            }
        }, (RecordComparator)new IntRecordComparator(){

            @Override
            public int compare(RowData o1, RowData o2) {
                return -super.compare(o1, o2);
            }
        }, ((Integer)this.conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES)).intValue(), ((Boolean)this.conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED)).booleanValue(), (int)((MemorySize)this.conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE)).getBytes(), ((Boolean)this.conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED)).booleanValue(), 0.7f);
        sorter.startThreads();
        sorter.write((MutableObjectIterator)reader);
        MutableObjectIterator iterator = sorter.getIterator();
        ArrayList<Tuple2> data = new ArrayList<Tuple2>();
        for (int i = 0; i < size; ++i) {
            data.add(new Tuple2((Object)i, (Object)BinaryExternalSorterTest.getString(i)));
        }
        data.sort((o1, o2) -> -((Integer)o1.f0).compareTo((Integer)o2.f0));
        BinaryRowData next = this.serializer.createInstance();
        for (int i = 0; i < size; ++i) {
            next = (BinaryRowData)iterator.next((Object)next);
            Assertions.assertThat((int)next.getInt(0)).isEqualTo(((Integer)((Tuple2)data.get((int)i)).f0).intValue());
            Assertions.assertThat((String)next.getString(1).toString()).isEqualTo((String)((Tuple2)data.get((int)i)).f1);
        }
        sorter.close();
    }

    @TestTemplate
    void testMergeManyTimes() throws Exception {
        int size = 1000000;
        MockBinaryRowReader reader = new MockBinaryRowReader(size);
        LOG.debug("initializing sortmerger");
        long minMemorySize = this.memoryManager.computeNumberOfPages(0.01) * 32768;
        this.conf.set(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES, (Object)8);
        BinaryExternalSorter sorter = new BinaryExternalSorter(new Object(), this.memoryManager, minMemorySize, this.ioManager, (AbstractRowDataSerializer)this.serializer, this.serializer, (NormalizedKeyComputer)IntNormalizedKeyComputer.INSTANCE, (RecordComparator)IntRecordComparator.INSTANCE, ((Integer)this.conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES)).intValue(), ((Boolean)this.conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED)).booleanValue(), (int)((MemorySize)this.conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE)).getBytes(), ((Boolean)this.conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED)).booleanValue(), 0.7f);
        sorter.startThreads();
        sorter.write((MutableObjectIterator)reader);
        MutableObjectIterator iterator = sorter.getIterator();
        BinaryRowData next = this.serializer.createInstance();
        for (int i = 0; i < size; ++i) {
            next = (BinaryRowData)iterator.next((Object)next);
            Assertions.assertThat((int)next.getInt(0)).isEqualTo(i);
            Assertions.assertThat((String)next.getString(1).toString()).isEqualTo(BinaryExternalSorterTest.getString(i));
        }
        sorter.close();
    }

    @TestTemplate
    void testSpillingRandom() throws Exception {
        int i;
        int size = 1000000;
        MockBinaryRowReader reader = new MockBinaryRowReader(size);
        LOG.debug("initializing sortmerger");
        long minMemorySize = this.memoryManager.computeNumberOfPages(0.1) * 32768;
        BinaryExternalSorter sorter = new BinaryExternalSorter(new Object(), this.memoryManager, minMemorySize, this.ioManager, (AbstractRowDataSerializer)this.serializer, this.serializer, (NormalizedKeyComputer)IntNormalizedKeyComputer.INSTANCE, (RecordComparator)IntRecordComparator.INSTANCE, ((Integer)this.conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES)).intValue(), ((Boolean)this.conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED)).booleanValue(), (int)((MemorySize)this.conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE)).getBytes(), ((Boolean)this.conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED)).booleanValue(), 0.7f);
        sorter.startThreads();
        ArrayList<BinaryRowData> data = new ArrayList<BinaryRowData>();
        BinaryRowData row = this.serializer.createInstance();
        for (i = 0; i < size; ++i) {
            row = reader.next(row);
            data.add(row.copy());
        }
        Collections.shuffle(data);
        for (i = 0; i < size; ++i) {
            sorter.write((RowData)data.get(i));
        }
        MutableObjectIterator iterator = sorter.getIterator();
        data.sort(Comparator.comparingInt(o -> o.getInt(0)));
        BinaryRowData next = this.serializer.createInstance();
        for (int i2 = 0; i2 < size; ++i2) {
            next = (BinaryRowData)iterator.next((Object)next);
            Assertions.assertThat((int)next.getInt(0)).isEqualTo(((BinaryRowData)data.get(i2)).getInt(0));
            Assertions.assertThat((Comparable)next.getString(1)).isEqualTo((Object)((BinaryRowData)data.get(i2)).getString(1));
        }
        sorter.close();
    }

    public class MockBinaryRowReader
    implements MutableObjectIterator<BinaryRowData> {
        private int size;
        private int count;
        private BinaryRowData row;
        private BinaryRowWriter writer;

        public MockBinaryRowReader(int size) {
            this.size = size;
            this.row = new BinaryRowData(2);
            this.writer = new BinaryRowWriter(this.row);
        }

        public BinaryRowData next(BinaryRowData reuse) {
            return this.next();
        }

        public BinaryRowData next() {
            if (this.count >= this.size) {
                return null;
            }
            this.writer.reset();
            this.writer.writeInt(0, this.count);
            this.writer.writeString(1, StringData.fromString((String)BinaryExternalSorterTest.getString(this.count)));
            this.writer.complete();
            ++this.count;
            return this.row;
        }
    }
}

