/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.hashtable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.memory.MemoryManagerBuilder;
import org.apache.flink.runtime.operators.testutils.UnionIterator;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.writer.BinaryRowWriter;
import org.apache.flink.table.runtime.generated.Projection;
import org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea;
import org.apache.flink.table.runtime.hashtable.BinaryHashPartition;
import org.apache.flink.table.runtime.hashtable.BinaryHashTable;
import org.apache.flink.table.runtime.hashtable.ProbeIterator;
import org.apache.flink.table.runtime.operators.join.HashJoinType;
import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
import org.apache.flink.table.runtime.util.ConstantsKeyValuePairsIterator;
import org.apache.flink.table.runtime.util.RowIterator;
import org.apache.flink.table.runtime.util.UniformBinaryRowGenerator;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.util.MutableObjectIterator;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.MapAssert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={ParameterizedTestExtension.class})
class BinaryHashTableTest {
    private static final int PAGE_SIZE = 32768;
    private IOManager ioManager;
    private BinaryRowDataSerializer buildSideSerializer;
    private BinaryRowDataSerializer probeSideSerializer;
    private boolean useCompress;

    BinaryHashTableTest(boolean useCompress) {
        this.useCompress = useCompress;
    }

    @Parameters(name="useCompress-{0}")
    private static List<Boolean> getVarSeg() {
        return Arrays.asList(true, false);
    }

    @BeforeEach
    void setup() {
        TypeInformation[] types = new TypeInformation[]{Types.INT, Types.INT};
        this.buildSideSerializer = new BinaryRowDataSerializer(types.length);
        this.probeSideSerializer = new BinaryRowDataSerializer(types.length);
        this.ioManager = new IOManagerAsync();
    }

    @AfterEach
    void tearDown() throws Exception {
        this.ioManager.close();
    }

    @TestTemplate
    void testIOBufferCountComputation() {
        Assertions.assertThat((int)BinaryHashTable.getNumWriteBehindBuffers((int)32)).isEqualTo(1);
        Assertions.assertThat((int)BinaryHashTable.getNumWriteBehindBuffers((int)33)).isEqualTo(1);
        Assertions.assertThat((int)BinaryHashTable.getNumWriteBehindBuffers((int)40)).isEqualTo(1);
        Assertions.assertThat((int)BinaryHashTable.getNumWriteBehindBuffers((int)64)).isEqualTo(1);
        Assertions.assertThat((int)BinaryHashTable.getNumWriteBehindBuffers((int)127)).isEqualTo(1);
        Assertions.assertThat((int)BinaryHashTable.getNumWriteBehindBuffers((int)128)).isEqualTo(2);
        Assertions.assertThat((int)BinaryHashTable.getNumWriteBehindBuffers((int)129)).isEqualTo(2);
        Assertions.assertThat((int)BinaryHashTable.getNumWriteBehindBuffers((int)511)).isEqualTo(2);
        Assertions.assertThat((int)BinaryHashTable.getNumWriteBehindBuffers((int)512)).isEqualTo(3);
        Assertions.assertThat((int)BinaryHashTable.getNumWriteBehindBuffers((int)513)).isEqualTo(3);
        Assertions.assertThat((int)BinaryHashTable.getNumWriteBehindBuffers((int)2047)).isEqualTo(3);
        Assertions.assertThat((int)BinaryHashTable.getNumWriteBehindBuffers((int)2048)).isEqualTo(4);
        Assertions.assertThat((int)BinaryHashTable.getNumWriteBehindBuffers((int)2049)).isEqualTo(4);
        Assertions.assertThat((int)BinaryHashTable.getNumWriteBehindBuffers((int)8191)).isEqualTo(4);
        Assertions.assertThat((int)BinaryHashTable.getNumWriteBehindBuffers((int)8192)).isEqualTo(5);
        Assertions.assertThat((int)BinaryHashTable.getNumWriteBehindBuffers((int)8193)).isEqualTo(5);
        Assertions.assertThat((int)BinaryHashTable.getNumWriteBehindBuffers((int)Short.MAX_VALUE)).isEqualTo(5);
        Assertions.assertThat((int)BinaryHashTable.getNumWriteBehindBuffers((int)32768)).isEqualTo(6);
        Assertions.assertThat((int)BinaryHashTable.getNumWriteBehindBuffers((int)Integer.MAX_VALUE)).isEqualTo(6);
    }

    @TestTemplate
    void testInMemoryMutableHashTable() throws IOException {
        int numKeys = 100000;
        int buildValsPerKey = 3;
        int probeValsPerKey = 10;
        UniformBinaryRowGenerator buildInput = new UniformBinaryRowGenerator(100000, 3, false);
        UniformBinaryRowGenerator probeInput = new UniformBinaryRowGenerator(100000, 10, true);
        MemoryManager memManager = MemoryManagerBuilder.newBuilder().setMemorySize(0x1C00000L).build();
        BinaryHashTable table = this.newBinaryHashTable(this.buildSideSerializer, this.probeSideSerializer, new MyProjection(), new MyProjection(), memManager, 0x320000L, this.ioManager);
        int numRecordsInJoinResult = this.join(table, buildInput, probeInput);
        ((AbstractIntegerAssert)Assertions.assertThat((int)numRecordsInJoinResult).as("Wrong number of records in join result.", new Object[0])).isEqualTo(3000000);
        table.close();
        table.free();
    }

    private int join(BinaryHashTable table, MutableObjectIterator<BinaryRowData> buildInput, MutableObjectIterator<BinaryRowData> probeInput) throws IOException {
        return this.join(table, buildInput, probeInput, false);
    }

    private int join(BinaryHashTable table, MutableObjectIterator<BinaryRowData> buildInput, MutableObjectIterator<BinaryRowData> probeInput, boolean buildOuterJoin) throws IOException {
        BinaryRowData buildRow;
        int count = 0;
        BinaryRowData reuseBuildSizeRow = this.buildSideSerializer.createInstance();
        while ((buildRow = (BinaryRowData)buildInput.next((Object)reuseBuildSizeRow)) != null) {
            table.putBuildRow((RowData)buildRow);
        }
        table.endBuild();
        BinaryRowData probeRow = this.probeSideSerializer.createInstance();
        while ((probeRow = (BinaryRowData)probeInput.next((Object)probeRow)) != null) {
            if (!table.tryProbe((RowData)probeRow)) continue;
            count += this.joinWithNextKey(table, buildOuterJoin);
        }
        while (table.nextMatching()) {
            count += this.joinWithNextKey(table, buildOuterJoin);
        }
        return count;
    }

    private int joinWithNextKey(BinaryHashTable table, boolean buildOuterJoin) throws IOException {
        int count;
        block3: {
            BinaryRowData buildRow;
            RowData probeRow;
            RowIterator buildIterator;
            block2: {
                count = 0;
                buildIterator = table.getBuildSideIterator();
                probeRow = table.getCurrentProbeRow();
                BinaryRowData binaryRowData = buildRow = buildIterator.advanceNext() ? (BinaryRowData)buildIterator.getRow() : null;
                if (probeRow == null || buildRow == null) break block2;
                ++count;
                while (buildIterator.advanceNext()) {
                    ++count;
                }
                break block3;
            }
            if (!buildOuterJoin || probeRow != null || buildRow == null) break block3;
            ++count;
            while (buildIterator.advanceNext()) {
                ++count;
            }
        }
        return count;
    }

    @TestTemplate
    void testSpillingHashJoinOneRecursionPerformance() throws IOException {
        int numKeys = 1000000;
        int buildValsPerKey = 3;
        int probeValsPerKey = 10;
        UniformBinaryRowGenerator buildInput = new UniformBinaryRowGenerator(1000000, 3, false);
        UniformBinaryRowGenerator probeInput = new UniformBinaryRowGenerator(1000000, 10, true);
        MemoryManager memManager = MemoryManagerBuilder.newBuilder().setMemorySize(0x640000L).setPageSize(32768).build();
        BinaryHashTable table = this.newBinaryHashTable(this.buildSideSerializer, this.probeSideSerializer, new MyProjection(), new MyProjection(), memManager, 0x320000L, this.ioManager);
        int numRecordsInJoinResult = this.join(table, buildInput, probeInput);
        ((AbstractIntegerAssert)Assertions.assertThat((int)numRecordsInJoinResult).as("Wrong number of records in join result.", new Object[0])).isEqualTo(30000000);
        table.close();
        table.free();
    }

    @TestTemplate
    void testSpillingHashJoinOneRecursionValidity() throws IOException {
        int numKeys = 1000000;
        int buildValsPerKey = 3;
        int probeValsPerKey = 10;
        UniformBinaryRowGenerator buildInput = new UniformBinaryRowGenerator(1000000, 3, false);
        UniformBinaryRowGenerator probeInput = new UniformBinaryRowGenerator(1000000, 10, true);
        HashMap<Integer, Long> map = new HashMap<Integer, Long>(1000000);
        MemoryManager memManager = MemoryManagerBuilder.newBuilder().setMemorySize(0x1C00000L).build();
        BinaryHashTable table = this.newBinaryHashTable(this.buildSideSerializer, this.probeSideSerializer, new MyProjection(), new MyProjection(), memManager, 0x320000L, this.ioManager);
        BinaryRowData recordReuse = new BinaryRowData(2);
        BinaryRowData buildRow = this.buildSideSerializer.createInstance();
        while ((buildRow = (BinaryRowData)buildInput.next(buildRow)) != null) {
            table.putBuildRow((RowData)buildRow);
        }
        table.endBuild();
        BinaryRowData probeRow = this.probeSideSerializer.createInstance();
        while ((probeRow = (BinaryRowData)probeInput.next(probeRow)) != null) {
            if (!table.tryProbe((RowData)probeRow)) continue;
            this.testJoin(table, map);
        }
        while (table.nextMatching()) {
            this.testJoin(table, map);
        }
        table.close();
        ((MapAssert)Assertions.assertThat(map).as("Wrong number of keys", new Object[0])).hasSize(1000000);
        for (Map.Entry<Integer, Long> entry : map.entrySet()) {
            long val = entry.getValue();
            int key = entry.getKey();
            ((AbstractLongAssert)Assertions.assertThat((long)val).as("Wrong number of values in per-key cross product for key " + key, new Object[0])).isEqualTo(30L);
        }
        table.free();
    }

    @TestTemplate
    void testSpillingHashJoinWithMassiveCollisions() throws IOException {
        int repeatedValue1 = 40559;
        int repeatedValue2 = 92882;
        int repeatedValueCountBuild = 200000;
        int repeatedValueCountProbe = 5;
        int numKeys = 1000000;
        int buildValsPerKey = 3;
        int probeValsPerKey = 10;
        UniformBinaryRowGenerator build1 = new UniformBinaryRowGenerator(1000000, 3, false);
        ConstantsKeyValuePairsIterator build2 = new ConstantsKeyValuePairsIterator(40559, 17, 200000);
        ConstantsKeyValuePairsIterator build3 = new ConstantsKeyValuePairsIterator(92882, 23, 200000);
        ArrayList<Object> builds = new ArrayList<Object>();
        builds.add(build1);
        builds.add(build2);
        builds.add(build3);
        UnionIterator buildInput = new UnionIterator(builds);
        UniformBinaryRowGenerator probe1 = new UniformBinaryRowGenerator(1000000, 10, true);
        ConstantsKeyValuePairsIterator probe2 = new ConstantsKeyValuePairsIterator(40559, 17, 5);
        ConstantsKeyValuePairsIterator probe3 = new ConstantsKeyValuePairsIterator(92882, 23, 5);
        ArrayList<Object> probes = new ArrayList<Object>();
        probes.add(probe1);
        probes.add(probe2);
        probes.add(probe3);
        UnionIterator probeInput = new UnionIterator(probes);
        HashMap<Integer, Long> map = new HashMap<Integer, Long>(1000000);
        MemoryManager memManager = MemoryManagerBuilder.newBuilder().setMemorySize(0x1C00000L).build();
        BinaryHashTable table = this.newBinaryHashTable(this.buildSideSerializer, this.probeSideSerializer, new MyProjection(), new MyProjection(), memManager, 0x1C00000L, this.ioManager);
        BinaryRowData recordReuse = new BinaryRowData(2);
        BinaryRowData buildRow = this.buildSideSerializer.createInstance();
        while ((buildRow = (BinaryRowData)buildInput.next((Object)buildRow)) != null) {
            table.putBuildRow((RowData)buildRow);
        }
        table.endBuild();
        BinaryRowData probeRow = this.probeSideSerializer.createInstance();
        while ((probeRow = (BinaryRowData)probeInput.next((Object)probeRow)) != null) {
            if (!table.tryProbe((RowData)probeRow)) continue;
            this.testJoin(table, map);
        }
        while (table.nextMatching()) {
            this.testJoin(table, map);
        }
        table.close();
        ((MapAssert)Assertions.assertThat(map).as("Wrong number of keys", new Object[0])).hasSize(1000000);
        for (Map.Entry<Integer, Long> entry : map.entrySet()) {
            long val = entry.getValue();
            int key = entry.getKey();
            ((AbstractLongAssert)Assertions.assertThat((long)val).as("Wrong number of values in per-key cross product for key " + key, new Object[0])).isEqualTo(key == 40559 || key == 92882 ? 3000045L : 30L);
        }
        table.free();
    }

    private void testJoin(BinaryHashTable table, HashMap<Integer, Long> map) throws IOException {
        BinaryRowData record;
        int numBuildValues = 0;
        RowData probeRec = table.getCurrentProbeRow();
        int key = probeRec.getInt(0);
        RowIterator buildSide = table.getBuildSideIterator();
        if (buildSide.advanceNext()) {
            numBuildValues = 1;
            record = (BinaryRowData)buildSide.getRow();
            ((AbstractIntegerAssert)Assertions.assertThat((int)record.getInt(0)).as("Probe-side key was different than build-side key.", new Object[0])).isEqualTo(key);
        } else {
            Assertions.fail((String)"No build side values found for a probe key.");
        }
        while (buildSide.advanceNext()) {
            ++numBuildValues;
            record = (BinaryRowData)buildSide.getRow();
            ((AbstractIntegerAssert)Assertions.assertThat((int)record.getInt(0)).as("Probe-side key was different than build-side key.", new Object[0])).isEqualTo(key);
        }
        Long contained = map.get(key);
        contained = contained == null ? Long.valueOf(numBuildValues) : Long.valueOf(contained + (long)numBuildValues);
        map.put(key, contained);
    }

    @TestTemplate
    void testSpillingHashJoinWithTwoRecursions() throws IOException {
        int repeatedValue1 = 40559;
        int repeatedValue2 = 92882;
        int repeatedValueCountBuild = 200000;
        int repeatedValueCountProbe = 5;
        int numKeys = 1000000;
        int buildValsPerKey = 3;
        int probeValsPerKey = 10;
        UniformBinaryRowGenerator build1 = new UniformBinaryRowGenerator(1000000, 3, false);
        ConstantsKeyValuePairsIterator build2 = new ConstantsKeyValuePairsIterator(40559, 17, 200000);
        ConstantsKeyValuePairsIterator build3 = new ConstantsKeyValuePairsIterator(92882, 23, 200000);
        ArrayList<Object> builds = new ArrayList<Object>();
        builds.add(build1);
        builds.add(build2);
        builds.add(build3);
        UnionIterator buildInput = new UnionIterator(builds);
        UniformBinaryRowGenerator probe1 = new UniformBinaryRowGenerator(1000000, 10, true);
        ConstantsKeyValuePairsIterator probe2 = new ConstantsKeyValuePairsIterator(40559, 17, 5);
        ConstantsKeyValuePairsIterator probe3 = new ConstantsKeyValuePairsIterator(92882, 23, 5);
        ArrayList<Object> probes = new ArrayList<Object>();
        probes.add(probe1);
        probes.add(probe2);
        probes.add(probe3);
        UnionIterator probeInput = new UnionIterator(probes);
        HashMap<Integer, Long> map = new HashMap<Integer, Long>(1000000);
        MemoryManager memManager = MemoryManagerBuilder.newBuilder().setMemorySize(0x1C00000L).build();
        BinaryHashTable table = this.newBinaryHashTable(this.buildSideSerializer, this.probeSideSerializer, new MyProjection(), new MyProjection(), memManager, 0x1C00000L, this.ioManager);
        BinaryRowData recordReuse = new BinaryRowData(2);
        BinaryRowData buildRow = this.buildSideSerializer.createInstance();
        while ((buildRow = (BinaryRowData)buildInput.next((Object)buildRow)) != null) {
            table.putBuildRow((RowData)buildRow);
        }
        table.endBuild();
        BinaryRowData probeRow = this.probeSideSerializer.createInstance();
        while ((probeRow = (BinaryRowData)probeInput.next((Object)probeRow)) != null) {
            if (!table.tryProbe((RowData)probeRow)) continue;
            this.testJoin(table, map);
        }
        while (table.nextMatching()) {
            this.testJoin(table, map);
        }
        table.close();
        ((MapAssert)Assertions.assertThat(map).as("Wrong number of keys", new Object[0])).hasSize(1000000);
        for (Map.Entry<Integer, Long> entry : map.entrySet()) {
            long val = entry.getValue();
            int key = entry.getKey();
            ((AbstractLongAssert)Assertions.assertThat((long)val).as("Wrong number of values in per-key cross product for key " + key, new Object[0])).isEqualTo(key == 40559 || key == 92882 ? 3000045L : 30L);
        }
        table.free();
    }

    @TestTemplate
    void testSpillingHashJoinWithTooManyRecursions() throws IOException {
        int repeatedValue1 = 40559;
        int repeatedValue2 = 92882;
        int repeatedValueCount = 3000000;
        int numKeys = 1000000;
        int buildValsPerKey = 3;
        int probeValsPerKey = 10;
        UniformBinaryRowGenerator build1 = new UniformBinaryRowGenerator(1000000, 3, false);
        ConstantsKeyValuePairsIterator build2 = new ConstantsKeyValuePairsIterator(40559, 17, 3000000);
        ConstantsKeyValuePairsIterator build3 = new ConstantsKeyValuePairsIterator(92882, 23, 3000000);
        ArrayList<Object> builds = new ArrayList<Object>();
        builds.add(build1);
        builds.add(build2);
        builds.add(build3);
        UnionIterator buildInput = new UnionIterator(builds);
        UniformBinaryRowGenerator probe1 = new UniformBinaryRowGenerator(1000000, 10, true);
        ConstantsKeyValuePairsIterator probe2 = new ConstantsKeyValuePairsIterator(40559, 17, 3000000);
        ConstantsKeyValuePairsIterator probe3 = new ConstantsKeyValuePairsIterator(92882, 23, 3000000);
        ArrayList<Object> probes = new ArrayList<Object>();
        probes.add(probe1);
        probes.add(probe2);
        probes.add(probe3);
        UnionIterator probeInput = new UnionIterator(probes);
        MemoryManager memManager = MemoryManagerBuilder.newBuilder().setMemorySize(0x1C00000L).build();
        BinaryHashTable table = this.newBinaryHashTable(this.buildSideSerializer, this.probeSideSerializer, new MyProjection(), new MyProjection(), memManager, 0x1C00000L, this.ioManager);
        HashMap<Integer, Long> map = new HashMap<Integer, Long>(1000000);
        BinaryRowData buildRow = this.buildSideSerializer.createInstance();
        while ((buildRow = (BinaryRowData)buildInput.next((Object)buildRow)) != null) {
            table.putBuildRow((RowData)buildRow);
        }
        table.endBuild();
        BinaryRowData probeRow = this.probeSideSerializer.createInstance();
        while ((probeRow = (BinaryRowData)probeInput.next((Object)probeRow)) != null) {
            if (!table.tryProbe((RowData)probeRow)) continue;
            this.testJoin(table, map);
        }
        while (table.nextMatching()) {
            this.testJoin(table, map);
        }
        ((AbstractIntegerAssert)Assertions.assertThat((int)map.size()).as("Wrong number of records in join result.", new Object[0])).isLessThan(1000000);
        ((AbstractIntegerAssert)Assertions.assertThat((int)table.getPartitionsPendingForSMJ().size()).as("Wrong number of spilled partition.", new Object[0])).isEqualTo(2);
        HashMap<Integer, Integer> spilledPartitionBuildSideKeys = new HashMap<Integer, Integer>();
        HashMap<Integer, Integer> spilledPartitionProbeSideKeys = new HashMap<Integer, Integer>();
        for (BinaryHashPartition p : table.getPartitionsPendingForSMJ()) {
            BinaryRowData rowData;
            RowIterator buildIter = table.getSpilledPartitionBuildSideIter(p);
            while (buildIter.advanceNext()) {
                Integer key = ((BinaryRowData)buildIter.getRow()).getInt(0);
                spilledPartitionBuildSideKeys.put(key, spilledPartitionBuildSideKeys.getOrDefault(key, 0) + 1);
            }
            ProbeIterator probeIter = table.getSpilledPartitionProbeSideIter(p);
            while ((rowData = probeIter.next()) != null) {
                Integer key = rowData.getInt(0);
                spilledPartitionProbeSideKeys.put(key, spilledPartitionProbeSideKeys.getOrDefault(key, 0) + 1);
            }
        }
        Integer buildKeyCnt = 3000003;
        Assertions.assertThat(spilledPartitionBuildSideKeys).containsEntry((Object)40559, (Object)buildKeyCnt);
        Assertions.assertThat(spilledPartitionBuildSideKeys).containsEntry((Object)92882, (Object)buildKeyCnt);
        Integer probeKeyCnt = 3000010;
        Assertions.assertThat(spilledPartitionProbeSideKeys).containsEntry((Object)40559, (Object)probeKeyCnt);
        Assertions.assertThat(spilledPartitionProbeSideKeys).containsEntry((Object)92882, (Object)probeKeyCnt);
        table.close();
        table.free();
    }

    @TestTemplate
    void testSparseProbeSpilling() throws IOException, MemoryAllocationException {
        int numBuildKeys = 1000000;
        boolean numBuildVals = true;
        int numProbeKeys = 20;
        boolean numProbeVals = true;
        UniformBinaryRowGenerator buildInput = new UniformBinaryRowGenerator(1000000, 1, false);
        MemoryManager memManager = MemoryManagerBuilder.newBuilder().setMemorySize(0x400000L).build();
        BinaryHashTable table = this.newBinaryHashTable(this.buildSideSerializer, this.probeSideSerializer, new MyProjection(), new MyProjection(), memManager, 0x320000L, this.ioManager);
        int expectedNumResults = Math.min(20, 1000000) * 1 * 1;
        int numRecordsInJoinResult = this.join(table, buildInput, new UniformBinaryRowGenerator(20, 1, true));
        ((AbstractIntegerAssert)Assertions.assertThat((int)numRecordsInJoinResult).as("Wrong number of records in join result.", new Object[0])).isEqualTo(expectedNumResults);
        table.close();
        table.free();
    }

    @TestTemplate
    void testSparseProbeSpillingWithOuterJoin() throws IOException {
        int numBuildKeys = 1000000;
        boolean numBuildVals = true;
        int numProbeKeys = 20;
        boolean numProbeVals = true;
        UniformBinaryRowGenerator buildInput = new UniformBinaryRowGenerator(1000000, 1, false);
        MemoryManager memManager = MemoryManagerBuilder.newBuilder().setMemorySize(0x300000L).build();
        BinaryHashTable table = new BinaryHashTable(new Object(), this.useCompress, (int)((MemorySize)ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.defaultValue()).getBytes(), (AbstractRowDataSerializer)this.buildSideSerializer, (AbstractRowDataSerializer)this.probeSideSerializer, (Projection)new MyProjection(), (Projection)new MyProjection(), memManager, 0x300000L, this.ioManager, 24, 200000L, true, HashJoinType.BUILD_OUTER, null, true, new boolean[]{true}, false);
        int expectedNumResults = Math.max(20, 1000000) * 1 * 1;
        int numRecordsInJoinResult = this.join(table, buildInput, new UniformBinaryRowGenerator(20, 1, true), true);
        ((AbstractIntegerAssert)Assertions.assertThat((int)numRecordsInJoinResult).as("Wrong number of records in join result.", new Object[0])).isEqualTo(expectedNumResults);
        table.close();
        table.free();
    }

    @TestTemplate
    void validateSpillingDuringInsertion() throws IOException {
        int numBuildKeys = 500000;
        boolean numBuildVals = true;
        int numProbeKeys = 10;
        boolean numProbeVals = true;
        UniformBinaryRowGenerator buildInput = new UniformBinaryRowGenerator(500000, 1, false);
        MemoryManager memManager = MemoryManagerBuilder.newBuilder().setMemorySize(2785280L).build();
        BinaryHashTable table = this.newBinaryHashTable(this.buildSideSerializer, this.probeSideSerializer, new MyProjection(), new MyProjection(), memManager, 2785280L, this.ioManager);
        int expectedNumResults = Math.min(10, 500000) * 1 * 1;
        int numRecordsInJoinResult = this.join(table, buildInput, new UniformBinaryRowGenerator(10, 1, true));
        ((AbstractIntegerAssert)Assertions.assertThat((int)numRecordsInJoinResult).as("Wrong number of records in join result.", new Object[0])).isEqualTo(expectedNumResults);
        table.close();
        table.free();
    }

    @TestTemplate
    void testBucketsNotFulfillSegment() throws Exception {
        int numKeys = 10000;
        int buildValsPerKey = 3;
        int probeValsPerKey = 10;
        UniformBinaryRowGenerator buildInput = new UniformBinaryRowGenerator(10000, 3, false);
        UniformBinaryRowGenerator probeInput = new UniformBinaryRowGenerator(10000, 10, true);
        MemoryManager memManager = MemoryManagerBuilder.newBuilder().setMemorySize(0x118000L).build();
        BinaryHashTable table = new BinaryHashTable(new Object(), this.useCompress, (int)((MemorySize)ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.defaultValue()).getBytes(), (AbstractRowDataSerializer)this.buildSideSerializer, (AbstractRowDataSerializer)this.probeSideSerializer, (Projection)new MyProjection(), (Projection)new MyProjection(), memManager, 0x118000L, this.ioManager, 24, 200000L, true, HashJoinType.INNER, null, false, new boolean[]{true}, false);
        int totalPages = table.getInternalPool().freePages();
        for (int i = 0; i < totalPages; ++i) {
            MemorySegment segment = table.getInternalPool().nextSegment();
            int newBucketOffset = segment.size() - 128;
            segment.put(newBucketOffset, (byte)0);
            segment.put(newBucketOffset + 1, (byte)0);
            segment.putShort(newBucketOffset + 2, (short)-1);
            segment.putLong(newBucketOffset + 4, -1L);
            table.returnPage(segment);
        }
        int numRecordsInJoinResult = this.join(table, buildInput, probeInput);
        ((AbstractIntegerAssert)Assertions.assertThat((int)numRecordsInJoinResult).as("Wrong number of records in join result.", new Object[0])).isEqualTo(300000);
        table.close();
        table.free();
    }

    @TestTemplate
    void testHashWithBuildSideOuterJoin1() throws Exception {
        int numKeys = 20000;
        boolean buildValsPerKey = true;
        boolean probeValsPerKey = true;
        UniformBinaryRowGenerator buildInput = new UniformBinaryRowGenerator(40000, 1, false);
        UniformBinaryRowGenerator probeInput = new UniformBinaryRowGenerator(20000, 1, true);
        MemoryManager memManager = MemoryManagerBuilder.newBuilder().setMemorySize(0x118000L).build();
        BinaryHashTable table = new BinaryHashTable(new Object(), this.useCompress, (int)((MemorySize)ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.defaultValue()).getBytes(), (AbstractRowDataSerializer)this.buildSideSerializer, (AbstractRowDataSerializer)this.probeSideSerializer, (Projection)new MyProjection(), (Projection)new MyProjection(), memManager, 0x118000L, this.ioManager, 24, 200000L, true, HashJoinType.BUILD_OUTER, null, true, new boolean[]{true}, false);
        int numRecordsInJoinResult = this.join(table, buildInput, probeInput, true);
        ((AbstractIntegerAssert)Assertions.assertThat((int)numRecordsInJoinResult).as("Wrong number of records in join result.", new Object[0])).isEqualTo(40000);
        table.close();
        table.free();
    }

    @TestTemplate
    void testHashWithBuildSideOuterJoin2() throws Exception {
        int numKeys = 40000;
        int buildValsPerKey = 2;
        boolean probeValsPerKey = true;
        UniformBinaryRowGenerator buildInput = new UniformBinaryRowGenerator(40000, 2, false);
        UniformBinaryRowGenerator probeInput = new UniformBinaryRowGenerator(40000, 1, true);
        MemoryManager memManager = MemoryManagerBuilder.newBuilder().setMemorySize(0x118000L).build();
        BinaryHashTable table = this.newBinaryHashTable(this.buildSideSerializer, this.probeSideSerializer, new MyProjection(), new MyProjection(), memManager, 0x108000L, this.ioManager);
        int numRecordsInJoinResult = this.join(table, buildInput, probeInput, true);
        ((AbstractIntegerAssert)Assertions.assertThat((int)numRecordsInJoinResult).as("Wrong number of records in join result.", new Object[0])).isEqualTo(80000);
        table.close();
        table.free();
    }

    @TestTemplate
    void testRepeatBuildJoin() throws Exception {
        int numKeys = 500;
        boolean probeValsPerKey = true;
        MemoryManager memManager = MemoryManagerBuilder.newBuilder().setMemorySize(0x140000L).build();
        MutableObjectIterator<BinaryRowData> buildInput = new MutableObjectIterator<BinaryRowData>(){
            int cnt = 0;

            public BinaryRowData next(BinaryRowData reuse) throws IOException {
                return this.next();
            }

            public BinaryRowData next() {
                ++this.cnt;
                if (this.cnt > 500) {
                    return null;
                }
                BinaryRowData row = new BinaryRowData(2);
                BinaryRowWriter writer = new BinaryRowWriter(row);
                writer.writeInt(0, 1);
                writer.writeInt(1, 1);
                writer.complete();
                return row;
            }
        };
        UniformBinaryRowGenerator probeInput = new UniformBinaryRowGenerator(500, 1, true);
        BinaryHashTable table = new BinaryHashTable(new Object(), this.useCompress, (int)((MemorySize)ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.defaultValue()).getBytes(), (AbstractRowDataSerializer)this.buildSideSerializer, (AbstractRowDataSerializer)this.probeSideSerializer, (Projection)new MyProjection(), (Projection)new MyProjection(), memManager, 0x140000L, this.ioManager, 24, 200000L, true, HashJoinType.INNER, null, false, new boolean[]{true}, true);
        int numRecordsInJoinResult = this.join(table, buildInput, probeInput, true);
        ((AbstractIntegerAssert)Assertions.assertThat((int)numRecordsInJoinResult).as("Wrong number of records in join result.", new Object[0])).isEqualTo(1);
        table.close();
        table.free();
    }

    @TestTemplate
    void testRepeatBuildJoinWithSpill() throws Exception {
        int numKeys = 30000;
        int numRows = 300000;
        boolean probeValsPerKey = true;
        MutableObjectIterator<BinaryRowData> buildInput = new MutableObjectIterator<BinaryRowData>(){
            int cnt = 0;

            public BinaryRowData next(BinaryRowData reuse) throws IOException {
                return this.next();
            }

            public BinaryRowData next() throws IOException {
                ++this.cnt;
                if (this.cnt > 300000) {
                    return null;
                }
                int value = this.cnt % 30000;
                BinaryRowData row = new BinaryRowData(2);
                BinaryRowWriter writer = new BinaryRowWriter(row);
                writer.writeInt(0, value);
                writer.writeInt(1, value);
                writer.complete();
                return row;
            }
        };
        MemoryManager memManager = MemoryManagerBuilder.newBuilder().setMemorySize(0x118000L).build();
        UniformBinaryRowGenerator probeInput = new UniformBinaryRowGenerator(30000, 1, true);
        BinaryHashTable table = new BinaryHashTable(new Object(), this.useCompress, (int)((MemorySize)ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.defaultValue()).getBytes(), (AbstractRowDataSerializer)this.buildSideSerializer, (AbstractRowDataSerializer)this.probeSideSerializer, (Projection)new MyProjection(), (Projection)new MyProjection(), memManager, 0x118000L, this.ioManager, 24, 200000L, true, HashJoinType.INNER, null, false, new boolean[]{true}, true);
        int numRecordsInJoinResult = this.join(table, buildInput, probeInput, true);
        ((AbstractIntegerAssert)Assertions.assertThat((int)numRecordsInJoinResult).as("Wrong number of records in join result.", new Object[0])).isLessThan(300000);
        table.close();
        table.free();
    }

    @TestTemplate
    void testBinaryHashBucketAreaNotEnoughMem() throws IOException {
        MemoryManager memManager = MemoryManagerBuilder.newBuilder().setMemorySize(0x118000L).build();
        BinaryHashTable table = this.newBinaryHashTable(this.buildSideSerializer, this.probeSideSerializer, new MyProjection(), new MyProjection(), memManager, 0x118000L, this.ioManager);
        BinaryHashBucketArea area = new BinaryHashBucketArea(table, 100.0, 1, false);
        for (int i = 0; i < 100000; ++i) {
            area.insertToBucket(i, i, true);
        }
        area.freeMemory();
        table.close();
        Assertions.assertThat((int)table.getInternalPool().freePages()).isEqualTo(35);
    }

    private BinaryHashTable newBinaryHashTable(BinaryRowDataSerializer buildSideSerializer, BinaryRowDataSerializer probeSideSerializer, Projection<RowData, BinaryRowData> buildSideProjection, Projection<RowData, BinaryRowData> probeSideProjection, MemoryManager memoryManager, long memory, IOManager ioManager) {
        return new BinaryHashTable(new Object(), this.useCompress, (int)((MemorySize)ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.defaultValue()).getBytes(), (AbstractRowDataSerializer)buildSideSerializer, (AbstractRowDataSerializer)probeSideSerializer, buildSideProjection, probeSideProjection, memoryManager, memory, ioManager, 24, 200000L, true, HashJoinType.INNER, null, false, new boolean[]{true}, false);
    }

    private static final class MyProjection
    implements Projection<RowData, BinaryRowData> {
        BinaryRowData innerRow = new BinaryRowData(1);
        BinaryRowWriter writer = new BinaryRowWriter(this.innerRow);

        private MyProjection() {
        }

        public BinaryRowData apply(RowData row) {
            this.writer.reset();
            if (row.isNullAt(0)) {
                this.writer.setNullAt(0);
            } else {
                this.writer.writeInt(0, row.getInt(0));
            }
            this.writer.complete();
            return this.innerRow;
        }
    }
}

