/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.hash;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentSource;
import org.apache.flink.core.memory.SeekableDataOutputView;
import org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
import org.apache.flink.runtime.io.disk.iomanager.BulkBlockChannelReader;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.HeaderlessChannelReaderInputView;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.operators.hash.HashPartition;
import org.apache.flink.runtime.operators.util.BitSet;
import org.apache.flink.runtime.operators.util.BloomFilter;
import org.apache.flink.util.MathUtils;
import org.apache.flink.util.MutableObjectIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MutableHashTable<BT, PT>
implements MemorySegmentSource {
    private static final Logger LOG = LoggerFactory.getLogger(MutableHashTable.class);
    private static final int MAX_RECURSION_DEPTH = 3;
    private static final int MIN_NUM_MEMORY_SEGMENTS = 33;
    private static final int MAX_NUM_PARTITIONS = 127;
    private static final int DEFAULT_RECORD_LEN = 24;
    private static final int HASH_CODE_LEN = 4;
    private static final int POINTER_LEN = 8;
    private static final int RECORD_TABLE_BYTES = 12;
    static final int NUM_INTRA_BUCKET_BITS = 7;
    static final int HASH_BUCKET_SIZE = 128;
    static final int BUCKET_HEADER_LENGTH = 16;
    private static final int NUM_ENTRIES_PER_BUCKET = 9;
    private static final int BUCKET_POINTER_START_OFFSET = 52;
    private static final int HEADER_PARTITION_OFFSET = 0;
    private static final int HEADER_STATUS_OFFSET = 1;
    private static final int HEADER_COUNT_OFFSET = 2;
    private static final int HEADER_FORWARD_OFFSET = 4;
    static final int HEADER_PROBED_FLAGS_OFFSET = 12;
    private static final long BUCKET_FORWARD_POINTER_NOT_SET = -1L;
    private static final byte BUCKET_STATUS_IN_MEMORY = 0;
    private static final byte BUCKET_STATUS_IN_FILTER = 1;
    protected final TypeSerializer<BT> buildSideSerializer;
    protected final TypeSerializer<PT> probeSideSerializer;
    protected final TypeComparator<BT> buildSideComparator;
    private final TypeComparator<PT> probeSideComparator;
    private final TypePairComparator<PT, BT> recordComparator;
    protected final List<MemorySegment> availableMemory;
    protected final LinkedBlockingQueue<MemorySegment> writeBehindBuffers;
    protected final IOManager ioManager;
    protected final int segmentSize;
    private final int totalNumBuffers;
    private final int numWriteBehindBuffers;
    protected final int bucketsPerSegmentMask;
    protected final int bucketsPerSegmentBits;
    private final int avgRecordLen;
    private final boolean useBloomFilters;
    protected final ArrayList<HashPartition<BT, PT>> partitionsBeingBuilt;
    private final ArrayList<HashPartition<BT, PT>> partitionsPending;
    private HashBucketIterator<BT, PT> bucketIterator;
    protected ProbeIterator<PT> probeIterator;
    private BlockChannelReader<MemorySegment> currentSpilledBuildSide;
    private BlockChannelReader<MemorySegment> currentSpilledProbeSide;
    protected FileIOChannel.Enumerator currentEnumerator;
    protected MemorySegment[] buckets;
    private BloomFilter bloomFilter;
    protected int numBuckets;
    protected int writeBehindBuffersAvailable;
    protected int currentRecursionDepth;
    protected AtomicBoolean closed = new AtomicBoolean();
    protected boolean keepBuildSidePartitions;
    private final BitSet probedSet = new BitSet(2);
    protected boolean furtherPartitioning;
    private boolean running = true;
    private boolean buildSideOuterJoin = false;
    private MutableObjectIterator<BT> unmatchedBuildIterator;
    private boolean probeMatchedPhase = true;
    private boolean unmatchedBuildVisited = false;

    public MutableHashTable(TypeSerializer<BT> buildSideSerializer, TypeSerializer<PT> probeSideSerializer, TypeComparator<BT> buildSideComparator, TypeComparator<PT> probeSideComparator, TypePairComparator<PT, BT> comparator, List<MemorySegment> memorySegments, IOManager ioManager) {
        this(buildSideSerializer, probeSideSerializer, buildSideComparator, probeSideComparator, comparator, memorySegments, ioManager, true);
    }

    public MutableHashTable(TypeSerializer<BT> buildSideSerializer, TypeSerializer<PT> probeSideSerializer, TypeComparator<BT> buildSideComparator, TypeComparator<PT> probeSideComparator, TypePairComparator<PT, BT> comparator, List<MemorySegment> memorySegments, IOManager ioManager, boolean useBloomFilters) {
        this(buildSideSerializer, probeSideSerializer, buildSideComparator, probeSideComparator, comparator, memorySegments, ioManager, 24, useBloomFilters);
    }

    public MutableHashTable(TypeSerializer<BT> buildSideSerializer, TypeSerializer<PT> probeSideSerializer, TypeComparator<BT> buildSideComparator, TypeComparator<PT> probeSideComparator, TypePairComparator<PT, BT> comparator, List<MemorySegment> memorySegments, IOManager ioManager, int avgRecordLen, boolean useBloomFilters) {
        if (memorySegments == null) {
            throw new NullPointerException();
        }
        if (memorySegments.size() < 33) {
            throw new IllegalArgumentException("Too few memory segments provided. Hash Join needs at least 33 memory segments.");
        }
        this.buildSideSerializer = buildSideSerializer;
        this.probeSideSerializer = probeSideSerializer;
        this.buildSideComparator = buildSideComparator;
        this.probeSideComparator = probeSideComparator;
        this.recordComparator = comparator;
        this.availableMemory = memorySegments;
        this.ioManager = ioManager;
        this.useBloomFilters = useBloomFilters;
        this.avgRecordLen = avgRecordLen > 0 ? avgRecordLen : (buildSideSerializer.getLength() == -1 ? 24 : buildSideSerializer.getLength());
        this.totalNumBuffers = memorySegments.size();
        this.segmentSize = memorySegments.get(0).size();
        if ((this.segmentSize & this.segmentSize - 1) != 0) {
            throw new IllegalArgumentException("Hash Table requires buffers whose size is a power of 2.");
        }
        int bucketsPerSegment = this.segmentSize >> 7;
        if (bucketsPerSegment == 0) {
            throw new IllegalArgumentException("Hash Table requires buffers of at least 128 bytes.");
        }
        this.bucketsPerSegmentMask = bucketsPerSegment - 1;
        this.bucketsPerSegmentBits = MathUtils.log2strict((int)bucketsPerSegment);
        this.writeBehindBuffers = new LinkedBlockingQueue();
        this.numWriteBehindBuffers = MutableHashTable.getNumWriteBehindBuffers(memorySegments.size());
        this.partitionsBeingBuilt = new ArrayList();
        this.partitionsPending = new ArrayList();
        this.closed.set(true);
    }

    public void open(MutableObjectIterator<BT> buildSide, MutableObjectIterator<PT> probeSide) throws IOException {
        this.open(buildSide, probeSide, false);
    }

    public void open(MutableObjectIterator<BT> buildSide, MutableObjectIterator<PT> probeSide, boolean buildOuterJoin) throws IOException {
        this.buildSideOuterJoin = buildOuterJoin;
        if (!this.closed.compareAndSet(true, false)) {
            throw new IllegalStateException("Hash Join cannot be opened, because it is currently not closed.");
        }
        for (int i = this.numWriteBehindBuffers; i > 0; --i) {
            this.writeBehindBuffers.add(this.availableMemory.remove(this.availableMemory.size() - 1));
        }
        this.currentRecursionDepth = 0;
        this.buildInitialTable(buildSide);
        this.probeIterator = new ProbeIterator<Object>(probeSide, this.probeSideSerializer.createInstance());
        this.bucketIterator = new HashBucketIterator<BT, PT>(this.buildSideSerializer, this.recordComparator, this.probedSet, buildOuterJoin);
    }

    protected boolean processProbeIter() throws IOException {
        PT next;
        ProbeIterator<PT> probeIter = this.probeIterator;
        TypeComparator<PT> probeAccessors = this.probeSideComparator;
        if (!this.probeMatchedPhase) {
            return false;
        }
        while ((next = probeIter.next()) != null) {
            int bucketInSegmentOffset;
            int hash = MutableHashTable.hash(probeAccessors.hash(next), this.currentRecursionDepth);
            int posHashCode = hash % this.numBuckets;
            int bucketArrayPos = posHashCode >> this.bucketsPerSegmentBits;
            MemorySegment bucket = this.buckets[bucketArrayPos];
            byte partitionNumber = bucket.get((bucketInSegmentOffset = (posHashCode & this.bucketsPerSegmentMask) << 7) + 0);
            HashPartition<BT, PT> p = this.partitionsBeingBuilt.get(partitionNumber);
            if (p.isInMemory()) {
                this.recordComparator.setReference(next);
                this.bucketIterator.set(bucket, p.overflowSegments, p, hash, bucketInSegmentOffset);
                return true;
            }
            byte status = bucket.get(bucketInSegmentOffset + 1);
            if (status == 1) {
                this.bloomFilter.setBitsLocation(bucket, bucketInSegmentOffset + 16);
                if (!this.bloomFilter.testHash(hash)) continue;
                p.insertIntoProbeBuffer(next);
                continue;
            }
            p.insertIntoProbeBuffer(next);
        }
        return false;
    }

    protected boolean processUnmatchedBuildIter() throws IOException {
        if (this.unmatchedBuildVisited) {
            return false;
        }
        this.probeMatchedPhase = false;
        UnmatchedBuildIterator<BT, PT> unmatchedBuildIter = new UnmatchedBuildIterator<BT, PT>(this.buildSideSerializer, this.numBuckets, this.bucketsPerSegmentBits, this.bucketsPerSegmentMask, this.buckets, this.partitionsBeingBuilt, this.probedSet);
        this.unmatchedBuildIterator = unmatchedBuildIter;
        if (unmatchedBuildIter.next() == null) {
            this.unmatchedBuildVisited = true;
            return false;
        }
        unmatchedBuildIter.back();
        this.unmatchedBuildVisited = true;
        return true;
    }

    protected boolean prepareNextPartition() throws IOException {
        int buffersAvailable = 0;
        for (int i = 0; i < this.partitionsBeingBuilt.size(); ++i) {
            HashPartition<BT, PT> p = this.partitionsBeingBuilt.get(i);
            p.setFurtherPatitioning(this.furtherPartitioning);
            buffersAvailable += p.finalizeProbePhase(this.availableMemory, this.partitionsPending, this.buildSideOuterJoin);
        }
        this.partitionsBeingBuilt.clear();
        this.writeBehindBuffersAvailable += buffersAvailable;
        this.releaseTable();
        if (this.currentSpilledBuildSide != null) {
            this.currentSpilledBuildSide.closeAndDelete();
            this.currentSpilledBuildSide = null;
        }
        if (this.currentSpilledProbeSide != null) {
            this.currentSpilledProbeSide.closeAndDelete();
            this.currentSpilledProbeSide = null;
        }
        if (this.partitionsPending.isEmpty()) {
            return false;
        }
        HashPartition<BT, PT> p = this.partitionsPending.get(0);
        if (p.probeSideRecordCounter == 0L) {
            ArrayList<MemorySegment> memory = new ArrayList<MemorySegment>();
            MemorySegment seg1 = this.getNextBuffer();
            if (seg1 != null) {
                memory.add(seg1);
                MemorySegment seg2 = this.getNextBuffer();
                if (seg2 != null) {
                    memory.add(seg2);
                }
            } else {
                throw new IllegalStateException("Attempting to begin reading spilled partition without any memory available");
            }
            this.currentSpilledBuildSide = this.ioManager.createBlockChannelReader(p.getBuildSideChannel().getChannelID());
            HeaderlessChannelReaderInputView inView = new HeaderlessChannelReaderInputView(this.currentSpilledBuildSide, memory, p.getBuildSideBlockCount(), p.getLastSegmentLimit(), false);
            ChannelReaderInputViewIterator<BT> inIter = new ChannelReaderInputViewIterator<BT>(inView, this.availableMemory, this.buildSideSerializer);
            this.unmatchedBuildIterator = inIter;
            this.partitionsPending.remove(0);
            return true;
        }
        this.probeMatchedPhase = true;
        this.unmatchedBuildVisited = false;
        this.buildTableFromSpilledPartition(p);
        LinkedBlockingQueue<MemorySegment> returnQueue = new LinkedBlockingQueue<MemorySegment>();
        this.currentSpilledProbeSide = this.ioManager.createBlockChannelReader(p.getProbeSideChannel().getChannelID(), returnQueue);
        ArrayList<MemorySegment> memory = new ArrayList<MemorySegment>();
        MemorySegment seg1 = this.getNextBuffer();
        if (seg1 != null) {
            memory.add(seg1);
            MemorySegment seg2 = this.getNextBuffer();
            if (seg2 != null) {
                memory.add(seg2);
            }
        } else {
            throw new IllegalStateException("Attempting to begin probing of partition without any memory available");
        }
        ChannelReaderInputViewIterator<PT> probeReader = new ChannelReaderInputViewIterator<PT>(this.currentSpilledProbeSide, returnQueue, memory, this.availableMemory, this.probeSideSerializer, p.getProbeSideBlockCount());
        this.probeIterator.set(probeReader);
        this.partitionsPending.remove(0);
        this.currentRecursionDepth = p.getRecursionLevel() + 1;
        return this.nextRecord();
    }

    public boolean nextRecord() throws IOException {
        if (this.buildSideOuterJoin) {
            return this.processProbeIter() || this.processUnmatchedBuildIter() || this.prepareNextPartition();
        }
        return this.processProbeIter() || this.prepareNextPartition();
    }

    public HashBucketIterator<BT, PT> getMatchesFor(PT record) throws IOException {
        int bucketInSegmentOffset;
        TypeComparator<PT> probeAccessors = this.probeSideComparator;
        int hash = MutableHashTable.hash(probeAccessors.hash(record), this.currentRecursionDepth);
        int posHashCode = hash % this.numBuckets;
        int bucketArrayPos = posHashCode >> this.bucketsPerSegmentBits;
        MemorySegment bucket = this.buckets[bucketArrayPos];
        byte partitionNumber = bucket.get((bucketInSegmentOffset = (posHashCode & this.bucketsPerSegmentMask) << 7) + 0);
        HashPartition<BT, PT> p = this.partitionsBeingBuilt.get(partitionNumber);
        if (p.isInMemory()) {
            this.recordComparator.setReference(record);
            this.bucketIterator.set(bucket, p.overflowSegments, p, hash, bucketInSegmentOffset);
            return this.bucketIterator;
        }
        throw new IllegalStateException("Method is not applicable to partially spilled hash tables.");
    }

    public PT getCurrentProbeRecord() {
        if (this.probeMatchedPhase) {
            return this.probeIterator.getCurrent();
        }
        return null;
    }

    public MutableObjectIterator<BT> getBuildSideIterator() {
        if (this.probeMatchedPhase) {
            return this.bucketIterator;
        }
        return this.unmatchedBuildIterator;
    }

    public void close() {
        int i;
        if (!this.closed.compareAndSet(false, true)) {
            return;
        }
        this.bucketIterator = null;
        this.probeIterator = null;
        this.releaseTable();
        this.clearPartitions();
        if (this.currentSpilledProbeSide != null) {
            try {
                this.currentSpilledProbeSide.closeAndDelete();
            }
            catch (Throwable t) {
                LOG.warn("Could not close and delete the temp file for the current spilled partition probe side.", t);
            }
        }
        for (i = 0; i < this.partitionsPending.size(); ++i) {
            HashPartition<BT, PT> p = this.partitionsPending.get(i);
            p.clearAllMemory(this.availableMemory);
        }
        for (i = 0; i < this.numWriteBehindBuffers + this.writeBehindBuffersAvailable; ++i) {
            try {
                this.availableMemory.add(this.writeBehindBuffers.take());
                continue;
            }
            catch (InterruptedException iex) {
                throw new RuntimeException("Hashtable closing was interrupted");
            }
        }
        this.writeBehindBuffersAvailable = 0;
    }

    public void abort() {
        this.running = false;
    }

    public List<MemorySegment> getFreedMemory() {
        if (!this.closed.get()) {
            throw new IllegalStateException("Cannot return memory while join is open.");
        }
        return this.availableMemory;
    }

    protected void buildInitialTable(MutableObjectIterator<BT> input) throws IOException {
        int partitionFanOut = MutableHashTable.getPartitioningFanOutNoEstimates(this.availableMemory.size());
        if (partitionFanOut > 127) {
            throw new RuntimeException("Hash join partitions estimate exeeds maximum number of partitions.");
        }
        this.createPartitions(partitionFanOut, 0);
        int numBuckets = MutableHashTable.getInitialTableSize(this.availableMemory.size(), this.segmentSize, partitionFanOut, this.avgRecordLen);
        this.initTable(numBuckets, (byte)partitionFanOut);
        TypeComparator<BT> buildTypeComparator = this.buildSideComparator;
        Object record = this.buildSideSerializer.createInstance();
        while (this.running && (record = input.next(record)) != null) {
            int hashCode = MutableHashTable.hash(buildTypeComparator.hash(record), 0);
            this.insertIntoTable(record, hashCode);
        }
        if (!this.running) {
            return;
        }
        for (int i = 0; i < this.partitionsBeingBuilt.size(); ++i) {
            HashPartition<BT, PT> p = this.partitionsBeingBuilt.get(i);
            p.finalizeBuildPhase(this.ioManager, this.currentEnumerator, this.writeBehindBuffers);
        }
    }

    private void initBloomFilter(int numBuckets) {
        int avgNumRecordsPerBucket = this.getEstimatedMaxBucketEntries(this.availableMemory.size(), this.segmentSize, numBuckets, this.avgRecordLen);
        int byteSize = 112;
        this.bloomFilter = new BloomFilter(avgNumRecordsPerBucket, byteSize);
        if (LOG.isDebugEnabled()) {
            double fpp = BloomFilter.estimateFalsePositiveProbability(avgNumRecordsPerBucket, byteSize << 3);
            LOG.debug(String.format("Create BloomFilter with average input entries per bucket[%d], bytes size[%d], false positive probability[%f].", avgNumRecordsPerBucket, byteSize, fpp));
        }
    }

    private int getEstimatedMaxBucketEntries(int numBuffers, int bufferSize, int numBuckets, int recordLenBytes) {
        long totalSize = (long)bufferSize * (long)numBuffers;
        long numRecordsStorable = totalSize / (long)(recordLenBytes + 12);
        long maxNumRecordsStorable = 4L * numRecordsStorable;
        long maxNumRecordsPerBucket = maxNumRecordsStorable / (long)numBuckets;
        return (int)maxNumRecordsPerBucket;
    }

    protected void buildTableFromSpilledPartition(HashPartition<BT, PT> p) throws IOException {
        int nextRecursionLevel = p.getRecursionLevel() + 1;
        if (nextRecursionLevel > 3) {
            throw new RuntimeException("Hash join exceeded maximum number of recursions, without reducing partitions enough to be memory resident. Probably cause: Too many duplicate keys.");
        }
        int totalBuffersAvailable = this.availableMemory.size() + this.writeBehindBuffersAvailable;
        if (totalBuffersAvailable != this.totalNumBuffers - this.numWriteBehindBuffers) {
            throw new RuntimeException("Hash Join bug in memory management: Memory buffers leaked.");
        }
        long numBuckets = p.getBuildSideRecordCount() / 9L + 1L;
        long totalBuffersNeeded = 2L * (numBuckets / (long)(this.bucketsPerSegmentMask + 1)) + (long)p.getBuildSideBlockCount() + 2L;
        if (totalBuffersNeeded < (long)totalBuffersAvailable) {
            this.ensureNumBuffersReturned(p.getBuildSideBlockCount());
            BulkBlockChannelReader reader = this.ioManager.createBulkBlockChannelReader(p.getBuildSideChannel().getChannelID(), this.availableMemory, p.getBuildSideBlockCount());
            if (this.keepBuildSidePartitions && p.recursionLevel == 0) {
                reader.close();
            } else {
                reader.closeAndDelete();
            }
            List<MemorySegment> partitionBuffers = reader.getFullSegments();
            HashPartition<BT, PT> newPart = new HashPartition<BT, PT>(this.buildSideSerializer, this.probeSideSerializer, 0, nextRecursionLevel, partitionBuffers, p.getBuildSideRecordCount(), this.segmentSize, p.getLastSegmentLimit());
            this.partitionsBeingBuilt.add(newPart);
            this.initTable((int)numBuckets, (byte)1);
            HashPartition.PartitionIterator pIter = newPart.getPartitionIterator(this.buildSideComparator);
            Object record = this.buildSideSerializer.createInstance();
            while ((record = pIter.next(record)) != null) {
                int hashCode = MutableHashTable.hash(pIter.getCurrentHashCode(), nextRecursionLevel);
                int posHashCode = hashCode % this.numBuckets;
                long pointer = pIter.getPointer();
                int bucketArrayPos = posHashCode >> this.bucketsPerSegmentBits;
                int bucketInSegmentPos = (posHashCode & this.bucketsPerSegmentMask) << 7;
                MemorySegment bucket = this.buckets[bucketArrayPos];
                this.insertBucketEntry(newPart, bucket, bucketInSegmentPos, hashCode, pointer, false);
            }
        } else {
            int avgRecordLenPartition = (int)((long)p.getBuildSideBlockCount() * (long)this.segmentSize / p.getBuildSideRecordCount());
            int bucketCount = MutableHashTable.getInitialTableSize(totalBuffersAvailable, this.segmentSize, MutableHashTable.getPartitioningFanOutNoEstimates(totalBuffersAvailable), avgRecordLenPartition);
            int splits = (int)(totalBuffersNeeded / (long)totalBuffersAvailable) + 1;
            int partitionFanOut = Math.min(10 * splits, 127);
            this.createPartitions(partitionFanOut, nextRecursionLevel);
            this.initTable(bucketCount, (byte)partitionFanOut);
            ArrayList<MemorySegment> segments = new ArrayList<MemorySegment>(2);
            segments.add(this.getNextBuffer());
            segments.add(this.getNextBuffer());
            BlockChannelReader<MemorySegment> inReader = this.ioManager.createBlockChannelReader(p.getBuildSideChannel().getChannelID());
            HeaderlessChannelReaderInputView inView = new HeaderlessChannelReaderInputView(inReader, segments, p.getBuildSideBlockCount(), p.getLastSegmentLimit(), false);
            ChannelReaderInputViewIterator<BT> inIter = new ChannelReaderInputViewIterator<BT>(inView, this.availableMemory, this.buildSideSerializer);
            TypeComparator<BT> btComparator = this.buildSideComparator;
            Object rec = this.buildSideSerializer.createInstance();
            while ((rec = inIter.next(rec)) != null) {
                int hashCode = MutableHashTable.hash(btComparator.hash(rec), nextRecursionLevel);
                this.insertIntoTable(rec, hashCode);
            }
            if (this.keepBuildSidePartitions && p.recursionLevel == 0) {
                inReader.close();
            } else {
                inReader.closeAndDelete();
            }
            for (int i = 0; i < this.partitionsBeingBuilt.size(); ++i) {
                HashPartition<BT, PT> part = this.partitionsBeingBuilt.get(i);
                part.finalizeBuildPhase(this.ioManager, this.currentEnumerator, this.writeBehindBuffers);
            }
        }
    }

    protected final void insertIntoTable(BT record, int hashCode) throws IOException {
        int posHashCode = hashCode % this.numBuckets;
        int bucketArrayPos = posHashCode >> this.bucketsPerSegmentBits;
        MemorySegment bucket = this.buckets[bucketArrayPos];
        int bucketInSegmentPos = (posHashCode & this.bucketsPerSegmentMask) << 7;
        byte partitionNumber = bucket.get(bucketInSegmentPos + 0);
        if (partitionNumber < 0 || partitionNumber >= this.partitionsBeingBuilt.size()) {
            throw new RuntimeException("Error: Hash structures in Hash-Join are corrupt. Invalid partition number for bucket.");
        }
        HashPartition<BT, PT> p = this.partitionsBeingBuilt.get(partitionNumber);
        long pointer = p.insertIntoBuildBuffer(record);
        if (pointer != -1L) {
            this.insertBucketEntry(p, bucket, bucketInSegmentPos, hashCode, pointer, true);
        } else {
            byte status = bucket.get(bucketInSegmentPos + 1);
            if (status == 1) {
                this.bloomFilter.setBitsLocation(bucket, bucketInSegmentPos + 16);
                this.bloomFilter.addHash(hashCode);
            }
        }
    }

    final void insertBucketEntry(HashPartition<BT, PT> p, MemorySegment bucket, int bucketInSegmentPos, int hashCode, long pointer, boolean spillingAllowed) throws IOException {
        short count = bucket.getShort(bucketInSegmentPos + 2);
        if (count < 9) {
            bucket.putInt(bucketInSegmentPos + 16 + count * 4, hashCode);
            bucket.putLong(bucketInSegmentPos + 52 + count * 8, pointer);
            bucket.putShort(bucketInSegmentPos + 2, (short)(count + 1));
        } else {
            int overflowBucketNum;
            int overflowBucketOffset;
            MemorySegment overflowSeg;
            long forwardForNewBucket;
            long originalForwardPointer = bucket.getLong(bucketInSegmentPos + 4);
            if (originalForwardPointer != -1L) {
                int overflowSegNum = (int)(originalForwardPointer >>> 32);
                MemorySegment seg = p.overflowSegments[overflowSegNum];
                int segOffset = (int)originalForwardPointer;
                short obCount = seg.getShort(segOffset + 2);
                if (obCount < 9) {
                    seg.putInt(segOffset + 16 + obCount * 4, hashCode);
                    seg.putLong(segOffset + 52 + obCount * 8, pointer);
                    seg.putShort(segOffset + 2, (short)(obCount + 1));
                    return;
                }
                forwardForNewBucket = originalForwardPointer;
            } else {
                forwardForNewBucket = -1L;
            }
            if (p.nextOverflowBucket == 0) {
                overflowSeg = this.getNextBuffer();
                if (overflowSeg == null) {
                    if (!spillingAllowed) {
                        throw new IOException("Hashtable memory ran out in a non-spillable situation. This is probably related to wrong size calculations.");
                    }
                    int spilledPart = this.spillPartition();
                    if (spilledPart == p.getPartitionNumber()) {
                        return;
                    }
                    overflowSeg = this.getNextBuffer();
                    if (overflowSeg == null) {
                        throw new RuntimeException("Bug in HybridHashJoin: No memory became available after spilling a partition.");
                    }
                }
                overflowBucketOffset = 0;
                overflowBucketNum = p.numOverflowSegments;
                if (p.overflowSegments.length <= p.numOverflowSegments) {
                    MemorySegment[] newSegsArray = new MemorySegment[p.overflowSegments.length * 2];
                    System.arraycopy(p.overflowSegments, 0, newSegsArray, 0, p.overflowSegments.length);
                    p.overflowSegments = newSegsArray;
                }
                p.overflowSegments[p.numOverflowSegments] = overflowSeg;
                ++p.numOverflowSegments;
            } else {
                overflowBucketNum = p.numOverflowSegments - 1;
                overflowSeg = p.overflowSegments[overflowBucketNum];
                overflowBucketOffset = p.nextOverflowBucket << 7;
            }
            p.nextOverflowBucket = p.nextOverflowBucket == this.bucketsPerSegmentMask ? 0 : p.nextOverflowBucket + 1;
            overflowSeg.putLong(overflowBucketOffset + 4, forwardForNewBucket);
            long pointerToNewBucket = (long)overflowBucketNum << 32 | (long)overflowBucketOffset;
            bucket.putLong(bucketInSegmentPos + 4, pointerToNewBucket);
            overflowSeg.putInt(overflowBucketOffset + 16, hashCode);
            overflowSeg.putLong(overflowBucketOffset + 52, pointer);
            overflowSeg.putShort(overflowBucketOffset + 2, (short)1);
            overflowSeg.putShort(overflowBucketOffset + 12, (short)0);
        }
    }

    protected HashPartition<BT, PT> getNewInMemoryPartition(int number, int recursionLevel) {
        return new HashPartition<BT, PT>(this.buildSideSerializer, this.probeSideSerializer, number, recursionLevel, this.availableMemory.remove(this.availableMemory.size() - 1), this, this.segmentSize);
    }

    protected void createPartitions(int numPartitions, int recursionLevel) {
        this.ensureNumBuffersReturned(numPartitions);
        this.currentEnumerator = this.ioManager.createChannelEnumerator();
        this.partitionsBeingBuilt.clear();
        for (int i = 0; i < numPartitions; ++i) {
            HashPartition<BT, PT> p = this.getNewInMemoryPartition(i, recursionLevel);
            this.partitionsBeingBuilt.add(p);
        }
    }

    protected void clearPartitions() {
        for (int i = this.partitionsBeingBuilt.size() - 1; i >= 0; --i) {
            HashPartition<BT, PT> p = this.partitionsBeingBuilt.get(i);
            try {
                p.clearAllMemory(this.availableMemory);
                continue;
            }
            catch (Exception e) {
                LOG.error("Error during partition cleanup.", (Throwable)e);
            }
        }
        this.partitionsBeingBuilt.clear();
    }

    protected void initTable(int numBuckets, byte numPartitions) {
        int bucketsPerSegment = this.bucketsPerSegmentMask + 1;
        int numSegs = (numBuckets >>> this.bucketsPerSegmentBits) + ((numBuckets & this.bucketsPerSegmentMask) == 0 ? 0 : 1);
        MemorySegment[] table = new MemorySegment[numSegs];
        this.ensureNumBuffersReturned(numSegs);
        int bucket = 0;
        for (int i = 0; i < numSegs && bucket < numBuckets; ++i) {
            MemorySegment seg = this.getNextBuffer();
            for (int k = 0; k < bucketsPerSegment && bucket < numBuckets; ++k, ++bucket) {
                int bucketOffset = k * 128;
                byte partition = MutableHashTable.assignPartition(bucket, numPartitions);
                seg.put(bucketOffset + 0, partition);
                seg.put(bucketOffset + 1, (byte)0);
                seg.putShort(bucketOffset + 2, (short)0);
                seg.putLong(bucketOffset + 4, -1L);
                seg.putShort(bucketOffset + 12, (short)0);
            }
            table[i] = seg;
        }
        this.buckets = table;
        this.numBuckets = numBuckets;
        if (this.useBloomFilters) {
            this.initBloomFilter(numBuckets);
        }
    }

    protected void releaseTable() {
        this.numBuckets = 0;
        if (this.buckets != null) {
            for (MemorySegment bucket : this.buckets) {
                this.availableMemory.add(bucket);
            }
            this.buckets = null;
        }
    }

    protected int spillPartition() throws IOException {
        MemorySegment currBuff;
        ArrayList<HashPartition<BT, PT>> partitions = this.partitionsBeingBuilt;
        int largestNumBlocks = 0;
        int largestPartNum = -1;
        for (int i = 0; i < partitions.size(); ++i) {
            HashPartition<BT, PT> p = partitions.get(i);
            if (!p.isInMemory() || p.getNumOccupiedMemorySegments() <= largestNumBlocks) continue;
            largestNumBlocks = p.getNumOccupiedMemorySegments();
            largestPartNum = i;
        }
        HashPartition<BT, PT> p = partitions.get(largestPartNum);
        if (this.useBloomFilters) {
            this.buildBloomFilterForBucketsInPartition(largestPartNum, p);
        }
        int numBuffersFreed = p.spillPartition(this.availableMemory, this.ioManager, this.currentEnumerator.next(), this.writeBehindBuffers);
        this.writeBehindBuffersAvailable += numBuffersFreed;
        while (this.writeBehindBuffersAvailable > 0 && (currBuff = this.writeBehindBuffers.poll()) != null) {
            this.availableMemory.add(currBuff);
            --this.writeBehindBuffersAvailable;
        }
        return largestPartNum;
    }

    protected final void buildBloomFilterForBucketsInPartition(int partNum, HashPartition<BT, PT> partition) {
        int bucketsPerSegment = this.bucketsPerSegmentMask + 1;
        int numSegs = this.buckets.length;
        int bucket = 0;
        for (int i = 0; i < numSegs && bucket < this.numBuckets; ++i) {
            MemorySegment segment = this.buckets[i];
            for (int k = 0; k < bucketsPerSegment && bucket < this.numBuckets; ++k, ++bucket) {
                byte status;
                int bucketInSegmentOffset = k * 128;
                byte partitionNumber = segment.get(bucketInSegmentOffset + 0);
                if (partitionNumber != partNum || (status = segment.get(bucketInSegmentOffset + 1)) != 0) continue;
                this.buildBloomFilterForBucket(bucketInSegmentOffset, segment, partition);
            }
        }
    }

    final void buildBloomFilterForBucket(int bucketInSegmentPos, MemorySegment bucket, HashPartition<BT, PT> p) {
        int count = bucket.getShort(bucketInSegmentPos + 2);
        if (count <= 0) {
            return;
        }
        int[] hashCodes = new int[count];
        for (int i = 0; i < count; ++i) {
            hashCodes[i] = bucket.getInt(bucketInSegmentPos + 16 + i * 4);
        }
        this.bloomFilter.setBitsLocation(bucket, bucketInSegmentPos + 16);
        for (int hashCode : hashCodes) {
            this.bloomFilter.addHash(hashCode);
        }
        this.buildBloomFilterForExtraOverflowSegments(bucketInSegmentPos, bucket, p);
    }

    private void buildBloomFilterForExtraOverflowSegments(int bucketInSegmentPos, MemorySegment bucket, HashPartition<BT, PT> p) {
        int totalCount = 0;
        boolean skip = false;
        long forwardPointer = bucket.getLong(bucketInSegmentPos + 4);
        while (forwardPointer != -1L) {
            int overflowSegNum = (int)(forwardPointer >>> 32);
            if (overflowSegNum < 0 || overflowSegNum >= p.numOverflowSegments) {
                skip = true;
                break;
            }
            MemorySegment overflowSegment = p.overflowSegments[overflowSegNum];
            int bucketInOverflowSegmentOffset = (int)forwardPointer;
            int count = overflowSegment.getShort(bucketInOverflowSegmentOffset + 2);
            if ((totalCount += count) > 2048) {
                skip = true;
                break;
            }
            for (int i = 0; i < count; ++i) {
                int hashCode = overflowSegment.getInt(bucketInOverflowSegmentOffset + 16 + i * 4);
                this.bloomFilter.addHash(hashCode);
            }
            forwardPointer = overflowSegment.getLong(bucketInOverflowSegmentOffset + 4);
        }
        if (!skip) {
            bucket.put(bucketInSegmentPos + 1, (byte)1);
        }
    }

    final void ensureNumBuffersReturned(int minRequiredAvailable) {
        if (minRequiredAvailable > this.availableMemory.size() + this.writeBehindBuffersAvailable) {
            throw new IllegalArgumentException("More buffers requested available than totally available.");
        }
        try {
            while (this.availableMemory.size() < minRequiredAvailable) {
                this.availableMemory.add(this.writeBehindBuffers.take());
                --this.writeBehindBuffersAvailable;
            }
        }
        catch (InterruptedException iex) {
            throw new RuntimeException("Hash Join was interrupted.");
        }
    }

    final MemorySegment getNextBuffer() {
        int s = this.availableMemory.size();
        if (s > 0) {
            return this.availableMemory.remove(s - 1);
        }
        if (this.writeBehindBuffersAvailable > 0) {
            MemorySegment currBuff;
            MemorySegment toReturn;
            try {
                toReturn = this.writeBehindBuffers.take();
            }
            catch (InterruptedException iex) {
                throw new RuntimeException("Hybrid Hash Join was interrupted while taking a buffer.");
            }
            --this.writeBehindBuffersAvailable;
            while (this.writeBehindBuffersAvailable > 0 && (currBuff = this.writeBehindBuffers.poll()) != null) {
                this.availableMemory.add(currBuff);
                --this.writeBehindBuffersAvailable;
            }
            return toReturn;
        }
        return null;
    }

    public MemorySegment nextSegment() {
        MemorySegment seg = this.getNextBuffer();
        if (seg != null) {
            return seg;
        }
        try {
            this.spillPartition();
        }
        catch (IOException ioex) {
            throw new RuntimeException("Error spilling Hash Join Partition" + (ioex.getMessage() == null ? "." : ": " + ioex.getMessage()), ioex);
        }
        MemorySegment fromSpill = this.getNextBuffer();
        if (fromSpill == null) {
            throw new RuntimeException("BUG in Hybrid Hash Join: Spilling did not free a buffer.");
        }
        return fromSpill;
    }

    public static int getNumWriteBehindBuffers(int numBuffers) {
        int numIOBufs = (int)(Math.log(numBuffers) / Math.log(4.0) - 1.5);
        return numIOBufs > 6 ? 6 : numIOBufs;
    }

    public static int getPartitioningFanOutNoEstimates(int numBuffers) {
        return Math.max(10, Math.min(numBuffers / 10, 127));
    }

    public static int getInitialTableSize(int numBuffers, int bufferSize, int numPartitions, int recordLenBytes) {
        long totalSize = (long)bufferSize * (long)numBuffers;
        long numRecordsStorable = totalSize / (long)(recordLenBytes + 12);
        long bucketBytes = numRecordsStorable * 12L;
        long numBuckets = bucketBytes / 256L + 1L;
        return numBuckets > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int)numBuckets;
    }

    public static byte assignPartition(int bucket, byte numPartitions) {
        return (byte)(bucket % numPartitions);
    }

    public static int hash(int code, int level) {
        int rotation = level * 11;
        code = Integer.rotateLeft(code, rotation);
        return MathUtils.jenkinsHash((int)code);
    }

    public TypeComparator<PT> getProbeSideComparator() {
        return this.probeSideComparator;
    }

    public static final class ProbeIterator<PT> {
        private MutableObjectIterator<PT> source;
        private PT instance;

        ProbeIterator(MutableObjectIterator<PT> source, PT instance) {
            this.instance = instance;
            this.set(source);
        }

        void set(MutableObjectIterator<PT> source) {
            this.source = source;
        }

        public PT next() throws IOException {
            Object retVal = this.source.next(this.instance);
            if (retVal != null) {
                this.instance = retVal;
                return (PT)retVal;
            }
            return null;
        }

        public PT getCurrent() {
            return this.instance;
        }
    }

    public static class UnmatchedBuildIterator<BT, PT>
    implements MutableObjectIterator<BT> {
        private final TypeSerializer<BT> accessor;
        private final long totalBucketNumber;
        private final int bucketsPerSegmentBits;
        private final int bucketsPerSegmentMask;
        private final MemorySegment[] buckets;
        private final ArrayList<HashPartition<BT, PT>> partitionsBeingBuilt;
        private final BitSet probedSet;
        private MemorySegment bucketSegment;
        private MemorySegment[] overflowSegments;
        private HashPartition<BT, PT> partition;
        private int scanCount;
        private int bucketInSegmentOffset;
        private int countInSegment;
        private int numInSegment;

        UnmatchedBuildIterator(TypeSerializer<BT> accessor, long totalBucketNumber, int bucketsPerSegmentBits, int bucketsPerSegmentMask, MemorySegment[] buckets, ArrayList<HashPartition<BT, PT>> partitionsBeingBuilt, BitSet probedSet) {
            this.accessor = accessor;
            this.totalBucketNumber = totalBucketNumber;
            this.bucketsPerSegmentBits = bucketsPerSegmentBits;
            this.bucketsPerSegmentMask = bucketsPerSegmentMask;
            this.buckets = buckets;
            this.partitionsBeingBuilt = partitionsBeingBuilt;
            this.probedSet = probedSet;
            this.init();
        }

        private void init() {
            this.scanCount = -1;
            while (!this.moveToNextBucket() && (long)this.scanCount < this.totalBucketNumber) {
            }
        }

        public BT next(BT reuse) {
            BT result;
            while ((result = this.nextInBucket(reuse)) == null) {
                if (this.moveToNextOnHeapBucket()) continue;
                return null;
            }
            return result;
        }

        public BT next() {
            BT result;
            while ((result = this.nextInBucket()) == null) {
                if (this.moveToNextOnHeapBucket()) continue;
                return null;
            }
            return result;
        }

        private boolean moveToNextOnHeapBucket() {
            while (!this.moveToNextBucket()) {
                if ((long)this.scanCount < this.totalBucketNumber) continue;
                return false;
            }
            return true;
        }

        private boolean moveToNextBucket() {
            ++this.scanCount;
            if ((long)this.scanCount > this.totalBucketNumber - 1L) {
                return false;
            }
            int bucketArrayPos = this.scanCount >> this.bucketsPerSegmentBits;
            MemorySegment currentBucket = this.buckets[bucketArrayPos];
            int currentBucketInSegmentOffset = (this.scanCount & this.bucketsPerSegmentMask) << 7;
            byte partitionNumber = currentBucket.get(currentBucketInSegmentOffset + 0);
            HashPartition<BT, PT> p = this.partitionsBeingBuilt.get(partitionNumber);
            if (p.isInMemory()) {
                this.setBucket(currentBucket, p.overflowSegments, p, currentBucketInSegmentOffset);
                return true;
            }
            return false;
        }

        private void setBucket(MemorySegment bucket, MemorySegment[] overflowSegments, HashPartition<BT, PT> partition, int bucketInSegmentOffset) {
            this.bucketSegment = bucket;
            this.overflowSegments = overflowSegments;
            this.partition = partition;
            this.bucketInSegmentOffset = bucketInSegmentOffset;
            this.countInSegment = bucket.getShort(bucketInSegmentOffset + 2);
            this.numInSegment = 0;
            this.probedSet.setMemorySegment(this.bucketSegment, this.bucketInSegmentOffset + 12);
        }

        private BT nextInBucket(BT reuse) {
            while (true) {
                if (this.numInSegment < this.countInSegment) {
                    boolean probed = this.probedSet.get(this.numInSegment);
                    if (!probed) {
                        long pointer = this.bucketSegment.getLong(this.bucketInSegmentOffset + 52 + this.numInSegment * 8);
                        try {
                            this.partition.setReadPosition(pointer);
                            reuse = this.accessor.deserialize(reuse, this.partition);
                            ++this.numInSegment;
                            return reuse;
                        }
                        catch (IOException ioex) {
                            throw new RuntimeException("Error deserializing key or value from the hashtable: " + ioex.getMessage(), ioex);
                        }
                    }
                    ++this.numInSegment;
                    continue;
                }
                if (this.bucketSegment == null) {
                    return null;
                }
                long forwardPointer = this.bucketSegment.getLong(this.bucketInSegmentOffset + 4);
                if (forwardPointer == -1L) {
                    return null;
                }
                int overflowSegNum = (int)(forwardPointer >>> 32);
                this.bucketSegment = this.overflowSegments[overflowSegNum];
                this.bucketInSegmentOffset = (int)forwardPointer;
                this.countInSegment = this.bucketSegment.getShort(this.bucketInSegmentOffset + 2);
                this.numInSegment = 0;
                this.probedSet.setMemorySegment(this.bucketSegment, this.bucketInSegmentOffset + 12);
            }
        }

        private BT nextInBucket() {
            while (true) {
                if (this.numInSegment < this.countInSegment) {
                    boolean probed = this.probedSet.get(this.numInSegment);
                    if (!probed) {
                        long pointer = this.bucketSegment.getLong(this.bucketInSegmentOffset + 52 + this.numInSegment * 8);
                        try {
                            this.partition.setReadPosition(pointer);
                            Object result = this.accessor.deserialize(this.partition);
                            ++this.numInSegment;
                            return (BT)result;
                        }
                        catch (IOException ioex) {
                            throw new RuntimeException("Error deserializing key or value from the hashtable: " + ioex.getMessage(), ioex);
                        }
                    }
                    ++this.numInSegment;
                    continue;
                }
                if (this.bucketSegment == null) {
                    return null;
                }
                long forwardPointer = this.bucketSegment.getLong(this.bucketInSegmentOffset + 4);
                if (forwardPointer == -1L) {
                    return null;
                }
                int overflowSegNum = (int)(forwardPointer >>> 32);
                this.bucketSegment = this.overflowSegments[overflowSegNum];
                this.bucketInSegmentOffset = (int)forwardPointer;
                this.countInSegment = this.bucketSegment.getShort(this.bucketInSegmentOffset + 2);
                this.numInSegment = 0;
                this.probedSet.setMemorySegment(this.bucketSegment, this.bucketInSegmentOffset + 12);
            }
        }

        public void back() {
            --this.numInSegment;
        }
    }

    public static class HashBucketIterator<BT, PT>
    implements MutableObjectIterator<BT> {
        private final TypeSerializer<BT> accessor;
        private final TypePairComparator<PT, BT> comparator;
        private MemorySegment bucket;
        private MemorySegment[] overflowSegments;
        private HashPartition<BT, PT> partition;
        private int bucketInSegmentOffset;
        private int searchHashCode;
        private int posInSegment;
        private int countInSegment;
        private int numInSegment;
        private int originalBucketInSegmentOffset;
        private MemorySegment originalBucket;
        private long lastPointer;
        private BitSet probedSet;
        private boolean isBuildOuterJoin = false;

        HashBucketIterator(TypeSerializer<BT> accessor, TypePairComparator<PT, BT> comparator, BitSet probedSet, boolean isBuildOuterJoin) {
            this.accessor = accessor;
            this.comparator = comparator;
            this.probedSet = probedSet;
            this.isBuildOuterJoin = isBuildOuterJoin;
        }

        void set(MemorySegment bucket, MemorySegment[] overflowSegments, HashPartition<BT, PT> partition, int searchHashCode, int bucketInSegmentOffset) {
            this.bucket = bucket;
            this.originalBucket = bucket;
            this.overflowSegments = overflowSegments;
            this.partition = partition;
            this.searchHashCode = searchHashCode;
            this.bucketInSegmentOffset = bucketInSegmentOffset;
            this.originalBucketInSegmentOffset = bucketInSegmentOffset;
            this.posInSegment = this.bucketInSegmentOffset + 16;
            this.countInSegment = bucket.getShort(bucketInSegmentOffset + 2);
            this.numInSegment = 0;
        }

        public BT next(BT reuse) {
            while (true) {
                this.probedSet.setMemorySegment(this.bucket, this.bucketInSegmentOffset + 12);
                while (this.numInSegment < this.countInSegment) {
                    int thisCode = this.bucket.getInt(this.posInSegment);
                    this.posInSegment += 4;
                    if (thisCode == this.searchHashCode) {
                        long pointer = this.bucket.getLong(this.bucketInSegmentOffset + 52 + this.numInSegment * 8);
                        ++this.numInSegment;
                        try {
                            this.partition.setReadPosition(pointer);
                            if (!this.comparator.equalToReference(reuse = this.accessor.deserialize(reuse, this.partition))) continue;
                            if (this.isBuildOuterJoin) {
                                this.probedSet.set(this.numInSegment - 1);
                            }
                            this.lastPointer = pointer;
                            return reuse;
                        }
                        catch (IOException ioex) {
                            throw new RuntimeException("Error deserializing key or value from the hashtable: " + ioex.getMessage(), ioex);
                        }
                    }
                    ++this.numInSegment;
                }
                long forwardPointer = this.bucket.getLong(this.bucketInSegmentOffset + 4);
                if (forwardPointer == -1L) {
                    return null;
                }
                int overflowSegNum = (int)(forwardPointer >>> 32);
                this.bucket = this.overflowSegments[overflowSegNum];
                this.bucketInSegmentOffset = (int)forwardPointer;
                this.countInSegment = this.bucket.getShort(this.bucketInSegmentOffset + 2);
                this.posInSegment = this.bucketInSegmentOffset + 16;
                this.numInSegment = 0;
            }
        }

        public BT next() {
            while (true) {
                this.probedSet.setMemorySegment(this.bucket, this.bucketInSegmentOffset + 12);
                while (this.numInSegment < this.countInSegment) {
                    int thisCode = this.bucket.getInt(this.posInSegment);
                    this.posInSegment += 4;
                    if (thisCode == this.searchHashCode) {
                        long pointer = this.bucket.getLong(this.bucketInSegmentOffset + 52 + this.numInSegment * 8);
                        ++this.numInSegment;
                        try {
                            this.partition.setReadPosition(pointer);
                            Object result = this.accessor.deserialize(this.partition);
                            if (!this.comparator.equalToReference(result)) continue;
                            if (this.isBuildOuterJoin) {
                                this.probedSet.set(this.numInSegment - 1);
                            }
                            this.lastPointer = pointer;
                            return (BT)result;
                        }
                        catch (IOException ioex) {
                            throw new RuntimeException("Error deserializing key or value from the hashtable: " + ioex.getMessage(), ioex);
                        }
                    }
                    ++this.numInSegment;
                }
                long forwardPointer = this.bucket.getLong(this.bucketInSegmentOffset + 4);
                if (forwardPointer == -1L) {
                    return null;
                }
                int overflowSegNum = (int)(forwardPointer >>> 32);
                this.bucket = this.overflowSegments[overflowSegNum];
                this.bucketInSegmentOffset = (int)forwardPointer;
                this.countInSegment = this.bucket.getShort(this.bucketInSegmentOffset + 2);
                this.posInSegment = this.bucketInSegmentOffset + 16;
                this.numInSegment = 0;
            }
        }

        public void writeBack(BT value) throws IOException {
            SeekableDataOutputView outView = this.partition.getWriteView();
            outView.setWritePosition(this.lastPointer);
            this.accessor.serialize(value, (DataOutputView)outView);
        }

        public void reset() {
            this.bucket = this.originalBucket;
            this.bucketInSegmentOffset = this.originalBucketInSegmentOffset;
            this.posInSegment = this.bucketInSegmentOffset + 16;
            this.countInSegment = this.bucket.getShort(this.bucketInSegmentOffset + 2);
            this.numInSegment = 0;
        }
    }
}

