/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.hash;

import java.io.EOFException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Random;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.util.CopyingListCollector;
import org.apache.flink.api.common.typeutils.SameTypePairComparator;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongComparator;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.operators.hash.AbstractMutableHashTable;
import org.apache.flink.runtime.operators.hash.InPlaceMutableHashTable;
import org.apache.flink.runtime.operators.hash.MutableHashTableTestBase;
import org.apache.flink.runtime.operators.testutils.UniformStringPairGenerator;
import org.apache.flink.runtime.operators.testutils.types.IntPair;
import org.apache.flink.runtime.operators.testutils.types.IntPairComparator;
import org.apache.flink.runtime.operators.testutils.types.IntPairSerializer;
import org.apache.flink.runtime.operators.testutils.types.StringPair;
import org.apache.flink.runtime.operators.testutils.types.StringPairComparator;
import org.apache.flink.runtime.operators.testutils.types.StringPairSerializer;
import org.apache.flink.shaded.guava30.com.google.common.collect.Ordering;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

public class InPlaceMutableHashTableTest
extends MutableHashTableTestBase {
    private static final long RANDOM_SEED = 58723953465322L;
    private static final int PAGE_SIZE = 16384;
    private final TypeSerializer<Tuple2<Long, String>> serializer;
    private final TypeComparator<Tuple2<Long, String>> comparator;
    private final TypeComparator<Long> probeComparator;
    private final TypePairComparator<Long, Tuple2<Long, String>> pairComparator;

    @Override
    protected <T> AbstractMutableHashTable<T> getHashTable(TypeSerializer<T> serializer, TypeComparator<T> comparator, List<MemorySegment> memory) {
        return new InPlaceMutableHashTable(serializer, comparator, memory);
    }

    public InPlaceMutableHashTableTest() {
        TypeSerializer[] fieldSerializers = new TypeSerializer[]{LongSerializer.INSTANCE, StringSerializer.INSTANCE};
        Class<Tuple2> clazz = Tuple2.class;
        this.serializer = new TupleSerializer(clazz, fieldSerializers);
        TypeComparator[] comparators = new TypeComparator[]{new LongComparator(true)};
        TypeSerializer[] comparatorSerializers = new TypeSerializer[]{LongSerializer.INSTANCE};
        this.comparator = new TupleComparator(new int[]{0}, comparators, comparatorSerializers);
        this.probeComparator = new LongComparator(true);
        this.pairComparator = new TypePairComparator<Long, Tuple2<Long, String>>(){
            private long ref;

            public void setReference(Long reference) {
                this.ref = reference;
            }

            public boolean equalToReference(Tuple2<Long, String> candidate) {
                return (Long)candidate.f0 == this.ref;
            }

            public int compareToReference(Tuple2<Long, String> candidate) {
                long x = this.ref;
                long y = (Long)candidate.f0;
                return x < y ? -1 : (x == y ? 0 : 1);
            }
        };
    }

    @Test
    public void testHashTableGrowthWithInsert() {
        try {
            Tuple2 next;
            int numElements = 1000000;
            List<MemorySegment> memory = InPlaceMutableHashTableTest.getMemory(10000, 32768);
            InPlaceMutableHashTable table = new InPlaceMutableHashTable(this.serializer, this.comparator, memory);
            table.open();
            for (long i = 0L; i < 1000000L; ++i) {
                table.insert((Object)new Tuple2((Object)i, (Object)String.valueOf(i)));
            }
            BitSet bitSet = new BitSet(1000000);
            InPlaceMutableHashTable.EntryIterator iter = table.getEntryIterator();
            while ((next = (Tuple2)iter.next()) != null) {
                Assert.assertNotNull((Object)next.f0);
                Assert.assertNotNull((Object)next.f1);
                Assert.assertEquals((long)((Long)next.f0), (long)Long.parseLong((String)next.f1));
                bitSet.set(((Long)next.f0).intValue());
            }
            Assert.assertEquals((long)1000000L, (long)bitSet.cardinality());
            InPlaceMutableHashTable.HashTableProber proper = table.getProber(this.probeComparator, this.pairComparator);
            Tuple2 reuse = new Tuple2();
            for (long i = 0L; i < 1000000L; ++i) {
                Assert.assertNotNull((Object)proper.getMatchFor((Object)i, (Object)reuse));
                Assert.assertNull((Object)proper.getMatchFor((Object)(i + 1000000L), (Object)reuse));
            }
            table.close();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testHashTableGrowthWithInsertOrReplace() {
        try {
            Tuple2 next;
            int numElements = 1000000;
            List<MemorySegment> memory = InPlaceMutableHashTableTest.getMemory(1000, 32768);
            InPlaceMutableHashTable table = new InPlaceMutableHashTable(this.serializer, this.comparator, memory);
            table.open();
            for (long i = 0L; i < 1000000L; ++i) {
                table.insertOrReplaceRecord((Object)Tuple2.of((Object)i, (Object)String.valueOf(i)));
            }
            BitSet bitSet = new BitSet(1000000);
            InPlaceMutableHashTable.EntryIterator iter = table.getEntryIterator();
            while ((next = (Tuple2)iter.next()) != null) {
                Assert.assertNotNull((Object)next.f0);
                Assert.assertNotNull((Object)next.f1);
                Assert.assertEquals((long)((Long)next.f0), (long)Long.parseLong((String)next.f1));
                bitSet.set(((Long)next.f0).intValue());
            }
            Assert.assertEquals((long)1000000L, (long)bitSet.cardinality());
            InPlaceMutableHashTable.HashTableProber proper = table.getProber(this.probeComparator, this.pairComparator);
            Tuple2 reuse = new Tuple2();
            for (long i = 0L; i < 1000000L; ++i) {
                Assert.assertNotNull((Object)proper.getMatchFor((Object)i, (Object)reuse));
                Assert.assertNull((Object)proper.getMatchFor((Object)(i + 1000000L), (Object)reuse));
            }
            table.close();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testWithIntPair() throws Exception {
        InPlaceMutableHashTable table;
        Random rnd = new Random(58723953465322L);
        int keyRange = 1000000;
        int valueRange = 10;
        int numRecords = 1000000;
        IntPairSerializer serializer = new IntPairSerializer();
        IntPairComparator comparator = new IntPairComparator();
        SumReducer reducer = new SumReducer();
        ArrayList expectedOutput = new ArrayList();
        InPlaceMutableHashTableWithJavaHashMap<IntPair, Integer> reference = new InPlaceMutableHashTableWithJavaHashMap<IntPair, Integer>(serializer, comparator, reducer, (Collector<IntPair>)new CopyingListCollector(expectedOutput, (TypeSerializer)serializer));
        int numMemPages = 1953;
        ArrayList actualOutput = new ArrayList();
        InPlaceMutableHashTable inPlaceMutableHashTable = table = new InPlaceMutableHashTable((TypeSerializer)serializer, (TypeComparator)comparator, InPlaceMutableHashTableTest.getMemory(1953, 16384));
        inPlaceMutableHashTable.getClass();
        InPlaceMutableHashTable.ReduceFacade reduceFacade = new InPlaceMutableHashTable.ReduceFacade(inPlaceMutableHashTable, (ReduceFunction)reducer, (Collector)new CopyingListCollector(actualOutput, (TypeSerializer)serializer), true);
        table.open();
        ArrayList<IntPair> input = new ArrayList<IntPair>();
        for (int i = 0; i < 1000000; ++i) {
            input.add(new IntPair(rnd.nextInt(1000000), rnd.nextInt(10)));
        }
        int numIntermingledEmits = 5;
        for (IntPair record : input) {
            reduceFacade.updateTableEntryWithReduce((Object)serializer.copy(record));
            reference.updateTableEntryWithReduce(serializer.copy(record), record.getKey());
            if (!(rnd.nextDouble() < 5.0E-6)) continue;
            reference.emitAndReset();
            reduceFacade.emitAndReset();
        }
        reference.emitAndReset();
        reduceFacade.emit();
        table.close();
        Assert.assertEquals((long)expectedOutput.size(), (long)actualOutput.size());
        Object[] expectedValues = new Integer[expectedOutput.size()];
        for (int i = 0; i < expectedOutput.size(); ++i) {
            expectedValues[i] = ((IntPair)expectedOutput.get(i)).getValue();
        }
        Object[] actualValues = new Integer[actualOutput.size()];
        for (int i = 0; i < actualOutput.size(); ++i) {
            actualValues[i] = ((IntPair)actualOutput.get(i)).getValue();
        }
        Arrays.sort(expectedValues, Ordering.natural());
        Arrays.sort(actualValues, Ordering.natural());
        Assert.assertArrayEquals((Object[])expectedValues, (Object[])actualValues);
    }

    @Test
    public void testWithLengthChangingReduceFunction() throws Exception {
        InPlaceMutableHashTable table;
        Random rnd = new Random(58723953465322L);
        int numKeys = 10000;
        int numVals = 10;
        int numRecords = 100000;
        StringPairSerializer serializer = new StringPairSerializer();
        StringPairComparator comparator = new StringPairComparator();
        ConcatReducer reducer = new ConcatReducer();
        ArrayList expectedOutput = new ArrayList();
        InPlaceMutableHashTableWithJavaHashMap<StringPair, String> reference = new InPlaceMutableHashTableWithJavaHashMap<StringPair, String>(serializer, comparator, reducer, (Collector<StringPair>)new CopyingListCollector(expectedOutput, (TypeSerializer)serializer));
        int numMemPages = 61;
        ArrayList actualOutput = new ArrayList();
        InPlaceMutableHashTable inPlaceMutableHashTable = table = new InPlaceMutableHashTable((TypeSerializer)serializer, (TypeComparator)comparator, InPlaceMutableHashTableTest.getMemory(61, 16384));
        inPlaceMutableHashTable.getClass();
        InPlaceMutableHashTable.ReduceFacade reduceFacade = new InPlaceMutableHashTable.ReduceFacade(inPlaceMutableHashTable, (ReduceFunction)reducer, (Collector)new CopyingListCollector(actualOutput, (TypeSerializer)serializer), true);
        for (int j = 0; j < 3; ++j) {
            table.open();
            reduceFacade.emit();
            reference.updateTableEntryWithReduce(serializer.copy(new StringPair("foo", "bar")), "foo");
            reference.updateTableEntryWithReduce(serializer.copy(new StringPair("foo", "baz")), "foo");
            reference.updateTableEntryWithReduce(serializer.copy(new StringPair("alma", "xyz")), "alma");
            reduceFacade.updateTableEntryWithReduce((Object)serializer.copy(new StringPair("foo", "bar")));
            reduceFacade.updateTableEntryWithReduce((Object)serializer.copy(new StringPair("foo", "baz")));
            reduceFacade.updateTableEntryWithReduce((Object)serializer.copy(new StringPair("alma", "xyz")));
            for (int i = 0; i < 5; ++i) {
                reduceFacade.updateTableEntryWithReduce((Object)serializer.copy(new StringPair("korte", "abc")));
                reference.updateTableEntryWithReduce(serializer.copy(new StringPair("korte", "abc")), "korte");
            }
            reference.emitAndReset();
            reduceFacade.emitAndReset();
            UniformStringPairGenerator gen = new UniformStringPairGenerator(10000, 10, true);
            ArrayList<StringPair> input = new ArrayList<StringPair>();
            StringPair cur = new StringPair();
            while (gen.next(cur) != null) {
                input.add(serializer.copy(cur));
            }
            Collections.shuffle(input, rnd);
            int numIntermingledEmits = 5;
            for (StringPair record : input) {
                reference.updateTableEntryWithReduce(serializer.copy(record), record.getKey());
                reduceFacade.updateTableEntryWithReduce((Object)serializer.copy(record));
                if (!(rnd.nextDouble() < 5.0E-5)) continue;
                reference.emitAndReset();
                reduceFacade.emitAndReset();
            }
            reference.emitAndReset();
            reduceFacade.emit();
            table.close();
            Assert.assertEquals((long)expectedOutput.size(), (long)actualOutput.size());
            Object[] expectedValues = new String[expectedOutput.size()];
            for (int i = 0; i < expectedOutput.size(); ++i) {
                expectedValues[i] = ((StringPair)expectedOutput.get(i)).getValue();
            }
            Object[] actualValues = new String[actualOutput.size()];
            for (int i = 0; i < actualOutput.size(); ++i) {
                actualValues[i] = ((StringPair)actualOutput.get(i)).getValue();
            }
            Arrays.sort(expectedValues, Ordering.natural());
            Arrays.sort(actualValues, Ordering.natural());
            Assert.assertArrayEquals((Object[])expectedValues, (Object[])actualValues);
            expectedOutput.clear();
            actualOutput.clear();
        }
    }

    private static List<MemorySegment> getMemory(int numPages, int pageSize) {
        ArrayList<MemorySegment> memory = new ArrayList<MemorySegment>();
        for (int i = 0; i < numPages; ++i) {
            memory.add(MemorySegmentFactory.allocateUnpooledSegment((int)pageSize));
        }
        return memory;
    }

    @Test
    public void testLargeRecordsWithManyCompactions() {
        try {
            long i;
            int numElements = 1000;
            String longString1 = InPlaceMutableHashTableTest.getLongString(100000);
            String longString2 = InPlaceMutableHashTableTest.getLongString(110000);
            List<MemorySegment> memory = InPlaceMutableHashTableTest.getMemory(3800, 32768);
            InPlaceMutableHashTable table = new InPlaceMutableHashTable(this.serializer, this.comparator, memory);
            table.open();
            for (i = 0L; i < 1000L; ++i) {
                table.insertOrReplaceRecord((Object)Tuple2.of((Object)i, (Object)longString1));
            }
            for (i = 0L; i < 1000L; ++i) {
                table.insertOrReplaceRecord((Object)Tuple2.of((Object)i, (Object)longString2));
            }
            InPlaceMutableHashTable.HashTableProber prober = table.getProber(this.comparator, (TypePairComparator)new SameTypePairComparator(this.comparator));
            Tuple2 reuse = new Tuple2();
            for (long i2 = 0L; i2 < 1000L; ++i2) {
                Assert.assertNotNull((Object)prober.getMatchFor((Object)Tuple2.of((Object)i2, (Object)longString2), (Object)reuse));
            }
            table.close();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    private static String getLongString(int length) {
        StringBuilder bld = new StringBuilder(length);
        for (int i = 0; i < length; ++i) {
            bld.append('a');
        }
        return bld.toString();
    }

    @Test
    public void testOutOfMemory() {
        try {
            List<MemorySegment> memory = InPlaceMutableHashTableTest.getMemory(100, 1024);
            InPlaceMutableHashTable table = new InPlaceMutableHashTable(this.serializer, this.comparator, memory);
            try {
                int numElements = 100000;
                table.open();
                for (long i = 0L; i < 100000L; ++i) {
                    table.insertOrReplaceRecord((Object)Tuple2.of((Object)i, (Object)"alma"));
                }
                Assert.fail((String)"We should have got out of memory (EOFException)");
            }
            catch (EOFException e) {
                table.close();
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    class ConcatReducer
    implements ReduceFunction<StringPair> {
        ConcatReducer() {
        }

        public StringPair reduce(StringPair a, StringPair b) throws Exception {
            if (a.getKey().compareTo(b.getKey()) != 0) {
                throw new RuntimeException("ConcatReducer was called with two records that have differing keys.");
            }
            return new StringPair(a.getKey(), a.getValue().concat(b.getValue()));
        }
    }

    class SumReducer
    implements ReduceFunction<IntPair> {
        SumReducer() {
        }

        public IntPair reduce(IntPair a, IntPair b) throws Exception {
            if (a.getKey() != b.getKey()) {
                throw new RuntimeException("SumReducer was called with two records that have differing keys.");
            }
            a.setValue(a.getValue() + b.getValue());
            return a;
        }
    }

    private class InPlaceMutableHashTableWithJavaHashMap<T, K> {
        TypeSerializer<T> serializer;
        TypeComparator<T> comparator;
        ReduceFunction<T> reducer;
        Collector<T> outputCollector;
        HashMap<K, T> map = new HashMap();

        public InPlaceMutableHashTableWithJavaHashMap(TypeSerializer<T> serializer, TypeComparator<T> comparator, ReduceFunction<T> reducer, Collector<T> outputCollector) {
            this.serializer = serializer;
            this.comparator = comparator;
            this.reducer = reducer;
            this.outputCollector = outputCollector;
        }

        public void updateTableEntryWithReduce(T record, K key) throws Exception {
            record = this.serializer.copy(record);
            if (!this.map.containsKey(key)) {
                this.map.put(key, record);
            } else {
                Object x = this.map.get(key);
                x = this.reducer.reduce(x, record);
                this.map.put(key, x);
            }
        }

        public void emitAndReset() {
            for (T record : this.map.values()) {
                this.outputCollector.collect(record);
            }
            this.map.clear();
        }
    }
}

