/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.join;

import java.util.Arrays;
import java.util.Collection;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.memory.MemoryManagerBuilder;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
import org.apache.flink.table.runtime.generated.GeneratedNormalizedKeyComputer;
import org.apache.flink.table.runtime.generated.GeneratedProjection;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.generated.JoinCondition;
import org.apache.flink.table.runtime.generated.NormalizedKeyComputer;
import org.apache.flink.table.runtime.generated.Projection;
import org.apache.flink.table.runtime.generated.RecordComparator;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.operators.join.Int2HashJoinOperatorTest;
import org.apache.flink.table.runtime.operators.join.Int2HashJoinOperatorTestBase;
import org.apache.flink.table.runtime.operators.join.SortMergeJoinFunction;
import org.apache.flink.table.runtime.operators.join.SortMergeJoinOperator;
import org.apache.flink.table.runtime.operators.sort.IntNormalizedKeyComputer;
import org.apache.flink.table.runtime.operators.sort.IntRecordComparator;
import org.apache.flink.table.runtime.util.UniformBinaryRowGenerator;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.util.MutableObjectIterator;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={ParameterizedTestExtension.class})
class Int2SortMergeJoinOperatorTest {
    private boolean leftIsSmaller;
    private MemoryManager memManager;
    private IOManager ioManager;

    public Int2SortMergeJoinOperatorTest(boolean leftIsSmaller) {
        this.leftIsSmaller = leftIsSmaller;
    }

    @Parameters(name="leftIsSmaller={0}")
    private static Collection<Boolean> parameters() {
        return Arrays.asList(true, false);
    }

    @BeforeEach
    void setup() {
        this.memManager = MemoryManagerBuilder.newBuilder().setMemorySize(0x2400000L).build();
        this.ioManager = new IOManagerAsync();
    }

    @AfterEach
    void tearDown() throws Exception {
        this.ioManager.close();
        if (!this.memManager.verifyEmpty()) {
            Assertions.fail((String)"Not all memory was properly released to the memory manager --> Memory Leak.");
        }
    }

    @TestTemplate
    void testInnerJoin() throws Exception {
        int numKeys = 100;
        int buildValsPerKey = 3;
        int probeValsPerKey = 10;
        UniformBinaryRowGenerator buildInput = new UniformBinaryRowGenerator(numKeys, buildValsPerKey, false);
        UniformBinaryRowGenerator probeInput = new UniformBinaryRowGenerator(numKeys, probeValsPerKey, true);
        this.buildJoin(buildInput, probeInput, FlinkJoinType.INNER, numKeys * buildValsPerKey * probeValsPerKey, numKeys, 165);
    }

    @TestTemplate
    void testLeftOutJoin() throws Exception {
        int numKeys1 = 9;
        int numKeys2 = 10;
        int buildValsPerKey = 3;
        int probeValsPerKey = 10;
        UniformBinaryRowGenerator buildInput = new UniformBinaryRowGenerator(numKeys1, buildValsPerKey, true);
        UniformBinaryRowGenerator probeInput = new UniformBinaryRowGenerator(numKeys2, probeValsPerKey, true);
        this.buildJoin(buildInput, probeInput, FlinkJoinType.LEFT, numKeys1 * buildValsPerKey * probeValsPerKey, numKeys1, 165);
    }

    @TestTemplate
    void testRightOutJoin() throws Exception {
        int numKeys1 = 9;
        int numKeys2 = 10;
        int buildValsPerKey = 3;
        int probeValsPerKey = 10;
        UniformBinaryRowGenerator buildInput = new UniformBinaryRowGenerator(numKeys1, buildValsPerKey, true);
        UniformBinaryRowGenerator probeInput = new UniformBinaryRowGenerator(numKeys2, probeValsPerKey, true);
        this.buildJoin(buildInput, probeInput, FlinkJoinType.RIGHT, 280, numKeys2, -1);
    }

    @TestTemplate
    void testFullOutJoin() throws Exception {
        int numKeys1 = 9;
        int numKeys2 = 10;
        int buildValsPerKey = 3;
        int probeValsPerKey = 10;
        UniformBinaryRowGenerator buildInput = new UniformBinaryRowGenerator(numKeys1, buildValsPerKey, true);
        UniformBinaryRowGenerator probeInput = new UniformBinaryRowGenerator(numKeys2, probeValsPerKey, true);
        this.buildJoin(buildInput, probeInput, FlinkJoinType.FULL, 280, numKeys2, -1);
    }

    @TestTemplate
    void testSemiJoin() throws Exception {
        int numKeys1 = 10;
        int numKeys2 = 9;
        int buildValsPerKey = 10;
        int probeValsPerKey = 3;
        UniformBinaryRowGenerator buildInput = new UniformBinaryRowGenerator(numKeys1, buildValsPerKey, true);
        UniformBinaryRowGenerator probeInput = new UniformBinaryRowGenerator(numKeys2, probeValsPerKey, true);
        StreamOperator operator = Int2SortMergeJoinOperatorTest.newOperator(FlinkJoinType.SEMI, false);
        Int2HashJoinOperatorTest.joinAndAssert(operator, buildInput, probeInput, 90, 9, 45, true);
    }

    @TestTemplate
    void testAntiJoin() throws Exception {
        int numKeys1 = 10;
        int numKeys2 = 9;
        int buildValsPerKey = 10;
        int probeValsPerKey = 3;
        UniformBinaryRowGenerator buildInput = new UniformBinaryRowGenerator(numKeys1, buildValsPerKey, true);
        UniformBinaryRowGenerator probeInput = new UniformBinaryRowGenerator(numKeys2, probeValsPerKey, true);
        StreamOperator operator = Int2SortMergeJoinOperatorTest.newOperator(FlinkJoinType.ANTI, false);
        Int2HashJoinOperatorTest.joinAndAssert(operator, buildInput, probeInput, 10, 1, 45, true);
    }

    private void buildJoin(MutableObjectIterator<BinaryRowData> input1, MutableObjectIterator<BinaryRowData> input2, FlinkJoinType type, int expertOutSize, int expertOutKeySize, int expertOutVal) throws Exception {
        Int2HashJoinOperatorTest.joinAndAssert(this.getOperator(type), input1, input2, expertOutSize, expertOutKeySize, expertOutVal, false);
    }

    private StreamOperator getOperator(FlinkJoinType type) {
        return Int2SortMergeJoinOperatorTest.newOperator(type, this.leftIsSmaller);
    }

    static StreamOperator newOperator(FlinkJoinType type, boolean leftIsSmaller) {
        return new SortMergeJoinOperator(Int2SortMergeJoinOperatorTest.getJoinFunction(type, leftIsSmaller));
    }

    public static SortMergeJoinFunction getJoinFunction(FlinkJoinType type, boolean leftIsSmaller) {
        int maxNumFileHandles = (Integer)ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES.defaultValue();
        boolean compressionEnable = (Boolean)ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED.defaultValue();
        int compressionBlockSize = (int)((MemorySize)ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.defaultValue()).getBytes();
        boolean asyncMergeEnable = (Boolean)ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED.defaultValue();
        return new SortMergeJoinFunction(0.0, type, leftIsSmaller, maxNumFileHandles, compressionEnable, compressionBlockSize, asyncMergeEnable, new GeneratedJoinCondition("", "", new Object[0]){

            public JoinCondition newInstance(ClassLoader classLoader) {
                return new Int2HashJoinOperatorTestBase.TrueCondition();
            }
        }, new GeneratedProjection("", "", new Object[0]){

            public Projection newInstance(ClassLoader classLoader) {
                return new Int2HashJoinOperatorTestBase.MyProjection();
            }
        }, new GeneratedProjection("", "", new Object[0]){

            public Projection newInstance(ClassLoader classLoader) {
                return new Int2HashJoinOperatorTestBase.MyProjection();
            }
        }, new GeneratedNormalizedKeyComputer("", ""){

            public NormalizedKeyComputer newInstance(ClassLoader classLoader) {
                return new IntNormalizedKeyComputer();
            }
        }, new GeneratedRecordComparator("", "", new Object[0]){

            public RecordComparator newInstance(ClassLoader classLoader) {
                return new IntRecordComparator();
            }
        }, new GeneratedNormalizedKeyComputer("", ""){

            public NormalizedKeyComputer newInstance(ClassLoader classLoader) {
                return new IntNormalizedKeyComputer();
            }
        }, new GeneratedRecordComparator("", "", new Object[0]){

            public RecordComparator newInstance(ClassLoader classLoader) {
                return new IntRecordComparator();
            }
        }, new GeneratedRecordComparator("", "", new Object[0]){

            public RecordComparator newInstance(ClassLoader classLoader) {
                return new IntRecordComparator();
            }
        }, new boolean[]{true});
    }
}

