/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.join;

import java.io.Serializable;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTestHarness;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.writer.BinaryRowWriter;
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
import org.apache.flink.table.runtime.generated.GeneratedNormalizedKeyComputer;
import org.apache.flink.table.runtime.generated.GeneratedProjection;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.generated.JoinCondition;
import org.apache.flink.table.runtime.generated.NormalizedKeyComputer;
import org.apache.flink.table.runtime.generated.Projection;
import org.apache.flink.table.runtime.generated.RecordComparator;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.operators.join.HashJoinOperator;
import org.apache.flink.table.runtime.operators.join.HashJoinType;
import org.apache.flink.table.runtime.operators.join.SortMergeJoinFunction;
import org.apache.flink.table.runtime.operators.sort.IntNormalizedKeyComputer;
import org.apache.flink.table.runtime.operators.sort.IntRecordComparator;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.JoinUtil;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.MutableObjectIterator;
import org.assertj.core.api.AbstractCollectionAssert;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.MapAssert;

public abstract class Int2HashJoinOperatorTestBase
implements Serializable {
    public void buildJoin(MutableObjectIterator<BinaryRowData> buildInput, MutableObjectIterator<BinaryRowData> probeInput, boolean leftOut, boolean rightOut, boolean buildLeft, int expectOutSize, int expectOutKeySize, int expectOutVal) throws Exception {
        FlinkJoinType flinkJoinType = JoinUtil.getJoinType(leftOut, rightOut);
        HashJoinType hashJoinType = HashJoinType.of((boolean)buildLeft, (boolean)leftOut, (boolean)rightOut);
        Object operator = this.newOperator(0x108000L, flinkJoinType, hashJoinType, buildLeft, !buildLeft);
        Int2HashJoinOperatorTestBase.joinAndAssert(operator, buildInput, probeInput, expectOutSize, expectOutKeySize, expectOutVal, false);
    }

    public Object newOperator(long memorySize, FlinkJoinType flinkJoinType, HashJoinType hashJoinType, boolean buildLeft, boolean reverseJoinFunction) {
        GeneratedJoinCondition condFuncCode = new GeneratedJoinCondition("", "", new Object[0]){

            public JoinCondition newInstance(ClassLoader classLoader) {
                return new TrueCondition();
            }
        };
        GeneratedProjection buildProjectionCode = new GeneratedProjection("", "", new Object[0]){

            public Projection newInstance(ClassLoader classLoader) {
                return new MyProjection();
            }
        };
        GeneratedProjection probeProjectionCode = new GeneratedProjection("", "", new Object[0]){

            public Projection newInstance(ClassLoader classLoader) {
                return new MyProjection();
            }
        };
        GeneratedNormalizedKeyComputer computer1 = new GeneratedNormalizedKeyComputer("", ""){

            public NormalizedKeyComputer newInstance(ClassLoader classLoader) {
                return new IntNormalizedKeyComputer();
            }
        };
        GeneratedRecordComparator comparator1 = new GeneratedRecordComparator("", "", new Object[0]){

            public RecordComparator newInstance(ClassLoader classLoader) {
                return new IntRecordComparator();
            }
        };
        GeneratedNormalizedKeyComputer computer2 = new GeneratedNormalizedKeyComputer("", ""){

            public NormalizedKeyComputer newInstance(ClassLoader classLoader) {
                return new IntNormalizedKeyComputer();
            }
        };
        GeneratedRecordComparator comparator2 = new GeneratedRecordComparator("", "", new Object[0]){

            public RecordComparator newInstance(ClassLoader classLoader) {
                return new IntRecordComparator();
            }
        };
        GeneratedRecordComparator genKeyComparator = new GeneratedRecordComparator("", "", new Object[0]){

            public RecordComparator newInstance(ClassLoader classLoader) {
                return new IntRecordComparator();
            }
        };
        boolean[] filterNulls = new boolean[]{true};
        int maxNumFileHandles = (Integer)ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES.defaultValue();
        boolean compressionEnabled = (Boolean)ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED.defaultValue();
        int compressionBlockSize = (int)((MemorySize)ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.defaultValue()).getBytes();
        boolean asyncMergeEnabled = (Boolean)ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED.defaultValue();
        SortMergeJoinFunction sortMergeJoinFunction = buildLeft ? new SortMergeJoinFunction(0.0, flinkJoinType, buildLeft, maxNumFileHandles, compressionEnabled, compressionBlockSize, asyncMergeEnabled, condFuncCode, buildProjectionCode, probeProjectionCode, computer1, comparator1, computer2, comparator2, genKeyComparator, filterNulls) : new SortMergeJoinFunction(0.0, flinkJoinType, buildLeft, maxNumFileHandles, compressionEnabled, compressionBlockSize, asyncMergeEnabled, condFuncCode, probeProjectionCode, buildProjectionCode, computer2, comparator2, computer1, comparator1, genKeyComparator, filterNulls);
        return HashJoinOperator.newHashJoinOperator((HashJoinType)hashJoinType, (boolean)buildLeft, (boolean)compressionEnabled, (int)compressionBlockSize, (GeneratedJoinCondition)condFuncCode, (boolean)reverseJoinFunction, (boolean[])filterNulls, (GeneratedProjection)buildProjectionCode, (GeneratedProjection)probeProjectionCode, (boolean)false, (int)20, (long)10000L, (long)10000L, (RowType)RowType.of((LogicalType[])new LogicalType[]{new IntType()}), (SortMergeJoinFunction)sortMergeJoinFunction);
    }

    public static void joinAndAssert(Object operator, MutableObjectIterator<BinaryRowData> input1, MutableObjectIterator<BinaryRowData> input2, int expectOutSize, int expectOutKeySize, int expectOutVal, boolean semiJoin) throws Exception {
        block16: {
            HashMap<Integer, Long> map;
            InternalTypeInfo typeInfo = InternalTypeInfo.ofFields((LogicalType[])new LogicalType[]{new IntType(), new IntType()});
            InternalTypeInfo rowDataTypeInfo = InternalTypeInfo.ofFields((LogicalType[])new LogicalType[]{new IntType(), new IntType(), new IntType(), new IntType()});
            TwoInputStreamTaskTestHarness testHarness = new TwoInputStreamTaskTestHarness(TwoInputStreamTask::new, 2, 1, new int[]{1, 2}, (TypeInformation)typeInfo, (TypeInformation)typeInfo, (TypeInformation)rowDataTypeInfo);
            testHarness.memorySize = 0x300000L;
            testHarness.getExecutionConfig().enableObjectReuse();
            testHarness.setupOutputForSingletonOperatorChain();
            if (operator instanceof StreamOperator) {
                testHarness.getStreamConfig().setStreamOperator((StreamOperator)operator);
            } else {
                testHarness.getStreamConfig().setStreamOperatorFactory((StreamOperatorFactory)operator);
            }
            testHarness.getStreamConfig().setOperatorID(new OperatorID());
            testHarness.getStreamConfig().setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase.OPERATOR, 0.99);
            testHarness.invoke();
            testHarness.waitForTaskRunning();
            Random random = new Random();
            while (true) {
                BinaryRowData row1 = null;
                BinaryRowData row2 = null;
                if (random.nextInt(2) == 0) {
                    row1 = (BinaryRowData)input1.next();
                    if (row1 == null) {
                        row2 = (BinaryRowData)input2.next();
                    }
                } else {
                    row2 = (BinaryRowData)input2.next();
                    if (row2 == null) {
                        row1 = (BinaryRowData)input1.next();
                    }
                }
                if (row1 == null && row2 == null) break;
                if (row1 != null) {
                    testHarness.processElement((Object)new StreamRecord((Object)row1), 0, 0);
                    continue;
                }
                testHarness.processElement((Object)new StreamRecord((Object)row2), 1, 0);
            }
            testHarness.endInput(0, 0);
            testHarness.endInput(1, 0);
            testHarness.waitForInputProcessing();
            testHarness.waitForTaskCompletion();
            LinkedBlockingQueue actual = testHarness.getOutput();
            ((AbstractCollectionAssert)Assertions.assertThat((Collection)actual).as("Output was not correct.", new Object[0])).hasSize(expectOutSize);
            if (expectOutVal == -1) break block16;
            if (semiJoin) {
                int key;
                map = new HashMap<Integer, Long>(expectOutKeySize);
                for (Object e : actual) {
                    StreamRecord record = (StreamRecord)e;
                    RowData row = (RowData)record.getValue();
                    key = row.getInt(0);
                    int val = row.getInt(1);
                    Long contained = (Long)map.get(key);
                    contained = contained == null ? Long.valueOf(val) : Long.valueOf(contained + (long)val);
                    map.put(key, contained);
                }
                ((MapAssert)Assertions.assertThat(map).as("Wrong number of keys", new Object[0])).hasSize(expectOutKeySize);
                for (Map.Entry entry : map.entrySet()) {
                    long val = (Long)entry.getValue();
                    key = (Integer)entry.getKey();
                    ((AbstractLongAssert)Assertions.assertThat((long)val).as("Wrong number of values in per-key cross product for key " + key, new Object[0])).isEqualTo((long)expectOutVal);
                }
            } else {
                int key;
                map = new HashMap(expectOutKeySize);
                for (Object e : actual) {
                    StreamRecord record = (StreamRecord)e;
                    RowData row = (RowData)record.getValue();
                    key = row.isNullAt(0) ? row.getInt(2) : row.getInt(0);
                    int val1 = 0;
                    int val2 = 0;
                    if (!row.isNullAt(1)) {
                        val1 = row.getInt(1);
                    }
                    if (!row.isNullAt(3)) {
                        val2 = row.getInt(3);
                    }
                    int val = val1 + val2;
                    Long contained = (Long)map.get(key);
                    contained = contained == null ? Long.valueOf(val) : Long.valueOf(contained + (long)val);
                    map.put(key, contained);
                }
                ((MapAssert)Assertions.assertThat(map).as("Wrong number of keys", new Object[0])).hasSize(expectOutKeySize);
                for (Map.Entry entry : map.entrySet()) {
                    long val = (Long)entry.getValue();
                    key = (Integer)entry.getKey();
                    ((AbstractLongAssert)Assertions.assertThat((long)val).as("Wrong number of values in per-key cross product for key " + key, new Object[0])).isEqualTo((long)expectOutVal);
                }
            }
        }
    }

    public static class MyJoinCondition
    extends AbstractRichFunction
    implements JoinCondition {
        public MyJoinCondition(Object[] reference) {
        }

        public boolean apply(RowData in1, RowData in2) {
            return true;
        }
    }

    public static class TrueCondition
    extends AbstractRichFunction
    implements JoinCondition {
        public boolean apply(RowData in1, RowData in2) {
            return true;
        }
    }

    public static final class MyProjection
    implements Projection<RowData, BinaryRowData> {
        BinaryRowData innerRow = new BinaryRowData(1);
        BinaryRowWriter writer = new BinaryRowWriter(this.innerRow);

        public BinaryRowData apply(RowData row) {
            this.writer.reset();
            if (row.isNullAt(0)) {
                this.writer.setNullAt(0);
            } else {
                this.writer.writeInt(0, row.getInt(0));
            }
            this.writer.complete();
            return this.innerRow;
        }
    }
}

