package org.apache.flink.runtime.io.network.partition.hybrid;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.TreeMap;
import java.util.function.BiConsumer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl;
import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexSpilledRegionManager;
import org.apache.flink.util.ExceptionUtils;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexSpilledRegionManagerImpl.class */
public class HsFileDataIndexSpilledRegionManagerImpl implements HsFileDataIndexSpilledRegionManager {
    private final List<TreeMap<Integer, SegmentMeta>> subpartitionFinishedSegmentMetas;
    private FileChannel channel;
    private final long[] subpartitionCurrentOffset;
    private final int[] subpartitionFreeSpaceInBytes;
    private final SegmentMeta[] currentSegmentMeta;
    private final int segmentSizeInBytes;
    private final BiConsumer<Integer, HsFileDataIndexImpl.InternalRegion> cacheRegionConsumer;
    private final boolean loadEntireSegmentToCache;
    private final ByteBuffer regionHeaderBuffer = InternalRegionWriteReadUtils.allocateAndConfigureBuffer(16);
    private long nextSegmentOffset = 0;

    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexSpilledRegionManagerImpl$Factory.class */
    public static class Factory implements HsFileDataIndexSpilledRegionManager.Factory {
        private final int segmentSizeInBytes;
        private final long maxCacheCapacity;

        public Factory(int i, long j) {
            this.segmentSizeInBytes = i;
            this.maxCacheCapacity = j;
        }

        @Override // org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexSpilledRegionManager.Factory
        public HsFileDataIndexSpilledRegionManager create(int i, Path path, BiConsumer<Integer, HsFileDataIndexImpl.InternalRegion> biConsumer) {
            return new HsFileDataIndexSpilledRegionManagerImpl(i, path, this.segmentSizeInBytes, this.maxCacheCapacity, biConsumer);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexSpilledRegionManagerImpl$SegmentMeta.class */
    public static class SegmentMeta {
        private int minBufferIndex = Integer.MAX_VALUE;
        private int maxBufferIndex = 0;
        private int numRegions = 0;
        private final long offset;

        public SegmentMeta(long j) {
            this.offset = j;
        }

        public int getMaxBufferIndex() {
            return this.maxBufferIndex;
        }

        public long getOffset() {
            return this.offset;
        }

        public int getNumRegions() {
            return this.numRegions;
        }

        public void addRegion(int i, int i2) {
            if (i < this.minBufferIndex) {
                this.minBufferIndex = i;
            }
            if (i2 > this.maxBufferIndex) {
                this.maxBufferIndex = i2;
            }
            this.numRegions++;
        }
    }

    public HsFileDataIndexSpilledRegionManagerImpl(int i, Path path, int i2, long j, BiConsumer<Integer, HsFileDataIndexImpl.InternalRegion> biConsumer) {
        try {
            this.channel = FileChannel.open(path, StandardOpenOption.CREATE_NEW, StandardOpenOption.READ, StandardOpenOption.WRITE);
        } catch (IOException e) {
            ExceptionUtils.rethrow(e);
        }
        this.loadEntireSegmentToCache = shouldLoadEntireSegmentToCache(i, i2, j);
        this.subpartitionFinishedSegmentMetas = new ArrayList(i);
        this.subpartitionCurrentOffset = new long[i];
        this.subpartitionFreeSpaceInBytes = new int[i];
        this.currentSegmentMeta = new SegmentMeta[i];
        for (int i3 = 0; i3 < i; i3++) {
            this.subpartitionFinishedSegmentMetas.add(new TreeMap<>());
        }
        this.cacheRegionConsumer = biConsumer;
        this.segmentSizeInBytes = i2;
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexSpilledRegionManager
    public long findRegion(int i, int i2, boolean z) {
        SegmentMeta segmentMeta = this.currentSegmentMeta[i];
        if (segmentMeta != null) {
            long findRegionInSegment = findRegionInSegment(i, i2, segmentMeta, z);
            if (findRegionInSegment != -1) {
                return findRegionInSegment;
            }
        }
        Iterator<SegmentMeta> it = this.subpartitionFinishedSegmentMetas.get(i).headMap(Integer.valueOf(i2), true).values().iterator();
        while (it.hasNext()) {
            long findRegionInSegment2 = findRegionInSegment(i, i2, it.next(), z);
            if (findRegionInSegment2 != -1) {
                return findRegionInSegment2;
            }
        }
        return -1L;
    }

    private long findRegionInSegment(int i, int i2, SegmentMeta segmentMeta, boolean z) {
        if (i2 > segmentMeta.getMaxBufferIndex()) {
            return -1L;
        }
        try {
            return readSegmentAndLoadToCacheIfNeeded(i, i2, segmentMeta, z);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private long readSegmentAndLoadToCacheIfNeeded(int i, int i2, SegmentMeta segmentMeta, boolean z) throws IOException {
        List<Tuple2<HsFileDataIndexImpl.InternalRegion, Long>> readSegment = readSegment(segmentMeta.getOffset(), segmentMeta.getNumRegions());
        long j = -1;
        HsFileDataIndexImpl.InternalRegion internalRegion = null;
        Iterator<Tuple2<HsFileDataIndexImpl.InternalRegion, Long>> it = readSegment.iterator();
        while (it.hasNext()) {
            Tuple2<HsFileDataIndexImpl.InternalRegion, Long> next = it.next();
            HsFileDataIndexImpl.InternalRegion internalRegion2 = (HsFileDataIndexImpl.InternalRegion) next.f0;
            if (internalRegion2.containBuffer(i2)) {
                internalRegion = internalRegion2;
                j = ((Long) next.f1).longValue();
                it.remove();
            }
        }
        if (internalRegion != null && z) {
            if (this.loadEntireSegmentToCache) {
                readSegment.forEach(tuple2 -> {
                    this.cacheRegionConsumer.accept(Integer.valueOf(i), tuple2.f0);
                });
                this.cacheRegionConsumer.accept(Integer.valueOf(i), internalRegion);
            } else {
                this.cacheRegionConsumer.accept(Integer.valueOf(i), internalRegion);
            }
        }
        return j;
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexSpilledRegionManager
    public void appendOrOverwriteRegion(int i, HsFileDataIndexImpl.InternalRegion internalRegion) throws IOException {
        long findRegion = findRegion(i, internalRegion.getFirstBufferIndex(), false);
        if (findRegion != -1) {
            writeRegionToOffset(findRegion, internalRegion);
        } else {
            appendRegion(i, internalRegion);
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexSpilledRegionManager, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.channel != null) {
            this.channel.close();
        }
    }

    private static boolean shouldLoadEntireSegmentToCache(int i, int i2, long j) {
        return ((2 * ((long) i)) * ((long) i2)) / 16 <= j;
    }

    private void appendRegion(int i, HsFileDataIndexImpl.InternalRegion internalRegion) throws IOException {
        int size = internalRegion.getSize();
        if (this.subpartitionFreeSpaceInBytes[i] < size) {
            startNewSegment(i, Math.max(size, this.segmentSizeInBytes));
        }
        writeRegionToOffset(this.subpartitionCurrentOffset[i], internalRegion);
        updateSegment(i, internalRegion);
    }

    private void writeRegionToOffset(long j, HsFileDataIndexImpl.InternalRegion internalRegion) throws IOException {
        this.channel.position(j);
        InternalRegionWriteReadUtils.writeRegionToFile(this.channel, this.regionHeaderBuffer, internalRegion);
    }

    private void startNewSegment(int i, int i2) {
        SegmentMeta segmentMeta = this.currentSegmentMeta[i];
        this.currentSegmentMeta[i] = new SegmentMeta(this.nextSegmentOffset);
        this.subpartitionCurrentOffset[i] = this.nextSegmentOffset;
        this.nextSegmentOffset += i2;
        this.subpartitionFreeSpaceInBytes[i] = i2;
        if (segmentMeta != null) {
            this.subpartitionFinishedSegmentMetas.get(i).put(Integer.valueOf(segmentMeta.minBufferIndex), segmentMeta);
        }
    }

    private void updateSegment(int i, HsFileDataIndexImpl.InternalRegion internalRegion) {
        int size = internalRegion.getSize();
        int[] iArr = this.subpartitionFreeSpaceInBytes;
        iArr[i] = iArr[i] - size;
        long[] jArr = this.subpartitionCurrentOffset;
        jArr[i] = jArr[i] + size;
        this.currentSegmentMeta[i].addRegion(internalRegion.getFirstBufferIndex(), (internalRegion.getFirstBufferIndex() + internalRegion.getNumBuffers()) - 1);
    }

    private List<Tuple2<HsFileDataIndexImpl.InternalRegion, Long>> readSegment(long j, int i) throws IOException {
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(Tuple2.of(InternalRegionWriteReadUtils.readRegionFromFile(this.channel, this.regionHeaderBuffer, j), Long.valueOf(j)));
            j += r0.getSize();
        }
        return arrayList;
    }
}
