package org.apache.flink.runtime.io.network.partition.hybrid.index;

import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.TreeMap;
import java.util.function.BiConsumer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.io.network.partition.hybrid.index.FileDataIndexRegionHelper;
import org.apache.flink.runtime.io.network.partition.hybrid.index.FileDataIndexRegionHelper.Region;
import org.apache.flink.runtime.io.network.partition.hybrid.index.FileDataIndexSpilledRegionManager;
import org.apache.flink.util.ExceptionUtils;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexSpilledRegionManagerImpl.class */
public class FileDataIndexSpilledRegionManagerImpl<T extends FileDataIndexRegionHelper.Region> implements FileDataIndexSpilledRegionManager<T> {
    private final List<TreeMap<Integer, RegionGroup>> subpartitionFinishedRegionGroupMetas;
    private FileChannel channel;
    private long nextRegionGroupOffset = 0;
    private final long[] subpartitionCurrentOffset;
    private final int[] subpartitionFreeSpaceInBytes;
    private final RegionGroup[] currentRegionGroup;
    private final int regionGroupSizeInBytes;
    private final BiConsumer<Integer, T> cacheRegionConsumer;
    private final FileDataIndexRegionHelper<T> fileDataIndexRegionHelper;
    private final boolean loadEntireRegionGroupToCache;

    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexSpilledRegionManagerImpl$Factory.class */
    public static class Factory<T extends FileDataIndexRegionHelper.Region> implements FileDataIndexSpilledRegionManager.Factory<T> {
        private final int regionGroupSizeInBytes;
        private final long maxCacheCapacity;
        private final int regionHeaderSize;
        private final FileDataIndexRegionHelper<T> fileDataIndexRegionHelper;

        public Factory(int i, long j, int i2, FileDataIndexRegionHelper<T> fileDataIndexRegionHelper) {
            this.regionGroupSizeInBytes = i;
            this.maxCacheCapacity = j;
            this.regionHeaderSize = i2;
            this.fileDataIndexRegionHelper = fileDataIndexRegionHelper;
        }

        @Override // org.apache.flink.runtime.io.network.partition.hybrid.index.FileDataIndexSpilledRegionManager.Factory
        public FileDataIndexSpilledRegionManager<T> create(int i, Path path, BiConsumer<Integer, T> biConsumer) {
            return new FileDataIndexSpilledRegionManagerImpl(i, path, this.regionGroupSizeInBytes, this.maxCacheCapacity, this.regionHeaderSize, biConsumer, this.fileDataIndexRegionHelper);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexSpilledRegionManagerImpl$RegionGroup.class */
    public static class RegionGroup {
        private int minBufferIndex = Integer.MAX_VALUE;
        private int maxBufferIndex = 0;
        private int numRegions = 0;
        private final long offset;

        public RegionGroup(long j) {
            this.offset = j;
        }

        public int getMaxBufferIndex() {
            return this.maxBufferIndex;
        }

        public long getOffset() {
            return this.offset;
        }

        public int getNumRegions() {
            return this.numRegions;
        }

        public void addRegion(int i, int i2) {
            if (i < this.minBufferIndex) {
                this.minBufferIndex = i;
            }
            if (i2 > this.maxBufferIndex) {
                this.maxBufferIndex = i2;
            }
            this.numRegions++;
        }
    }

    public FileDataIndexSpilledRegionManagerImpl(int i, Path path, int i2, long j, int i3, BiConsumer<Integer, T> biConsumer, FileDataIndexRegionHelper<T> fileDataIndexRegionHelper) {
        try {
            this.channel = FileChannel.open(path, StandardOpenOption.CREATE_NEW, StandardOpenOption.READ, StandardOpenOption.WRITE);
        } catch (IOException e) {
            ExceptionUtils.rethrow(e);
        }
        this.loadEntireRegionGroupToCache = shouldLoadEntireRegionGroupToCache(i, i2, j, i3);
        this.subpartitionFinishedRegionGroupMetas = new ArrayList(i);
        this.subpartitionCurrentOffset = new long[i];
        this.subpartitionFreeSpaceInBytes = new int[i];
        this.currentRegionGroup = new RegionGroup[i];
        for (int i4 = 0; i4 < i; i4++) {
            this.subpartitionFinishedRegionGroupMetas.add(new TreeMap<>());
        }
        this.cacheRegionConsumer = biConsumer;
        this.fileDataIndexRegionHelper = fileDataIndexRegionHelper;
        this.regionGroupSizeInBytes = i2;
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.index.FileDataIndexSpilledRegionManager
    public long findRegion(int i, int i2, boolean z) {
        RegionGroup regionGroup = this.currentRegionGroup[i];
        if (regionGroup != null) {
            long findRegionInRegionGroup = findRegionInRegionGroup(i, i2, regionGroup, z);
            if (findRegionInRegionGroup != -1) {
                return findRegionInRegionGroup;
            }
        }
        Iterator<RegionGroup> it = this.subpartitionFinishedRegionGroupMetas.get(i).headMap(Integer.valueOf(i2), true).values().iterator();
        while (it.hasNext()) {
            long findRegionInRegionGroup2 = findRegionInRegionGroup(i, i2, it.next(), z);
            if (findRegionInRegionGroup2 != -1) {
                return findRegionInRegionGroup2;
            }
        }
        return -1L;
    }

    private long findRegionInRegionGroup(int i, int i2, RegionGroup regionGroup, boolean z) {
        if (i2 > regionGroup.getMaxBufferIndex()) {
            return -1L;
        }
        try {
            return readRegionGroupAndLoadToCacheIfNeeded(i, i2, regionGroup, z);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private long readRegionGroupAndLoadToCacheIfNeeded(int i, int i2, RegionGroup regionGroup, boolean z) throws IOException {
        List<Tuple2<T, Long>> readRegionGroup = readRegionGroup(regionGroup.getOffset(), regionGroup.getNumRegions());
        long j = -1;
        T t = null;
        Iterator<Tuple2<T, Long>> it = readRegionGroup.iterator();
        while (it.hasNext()) {
            Tuple2<T, Long> next = it.next();
            FileDataIndexRegionHelper.Region region = (FileDataIndexRegionHelper.Region) next.f0;
            if (region.containBuffer(i2)) {
                t = region;
                j = ((Long) next.f1).longValue();
                it.remove();
            }
        }
        if (t != null && z) {
            if (this.loadEntireRegionGroupToCache) {
                readRegionGroup.forEach(tuple2 -> {
                    this.cacheRegionConsumer.accept(Integer.valueOf(i), (FileDataIndexRegionHelper.Region) tuple2.f0);
                });
                this.cacheRegionConsumer.accept(Integer.valueOf(i), t);
            } else {
                this.cacheRegionConsumer.accept(Integer.valueOf(i), t);
            }
        }
        return j;
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.index.FileDataIndexSpilledRegionManager
    public void appendOrOverwriteRegion(int i, T t) throws IOException {
        long findRegion = findRegion(i, t.getFirstBufferIndex(), false);
        if (findRegion != -1) {
            writeRegionToOffset(findRegion, t);
        } else {
            appendRegion(i, t);
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.index.FileDataIndexSpilledRegionManager, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.channel != null) {
            this.channel.close();
        }
    }

    private static boolean shouldLoadEntireRegionGroupToCache(int i, int i2, long j, int i3) {
        return ((2 * ((long) i)) * ((long) i2)) / ((long) i3) <= j;
    }

    private void appendRegion(int i, T t) throws IOException {
        int size = t.getSize();
        if (this.subpartitionFreeSpaceInBytes[i] < size) {
            startNewRegionGroup(i, Math.max(size, this.regionGroupSizeInBytes));
        }
        writeRegionToOffset(this.subpartitionCurrentOffset[i], t);
        updateRegionGroup(i, t);
    }

    private void writeRegionToOffset(long j, T t) throws IOException {
        this.channel.position(j);
        this.fileDataIndexRegionHelper.writeRegionToFile(this.channel, t);
    }

    private void startNewRegionGroup(int i, int i2) {
        RegionGroup regionGroup = this.currentRegionGroup[i];
        this.currentRegionGroup[i] = new RegionGroup(this.nextRegionGroupOffset);
        this.subpartitionCurrentOffset[i] = this.nextRegionGroupOffset;
        this.nextRegionGroupOffset += i2;
        this.subpartitionFreeSpaceInBytes[i] = i2;
        if (regionGroup != null) {
            this.subpartitionFinishedRegionGroupMetas.get(i).put(Integer.valueOf(regionGroup.minBufferIndex), regionGroup);
        }
    }

    private void updateRegionGroup(int i, T t) {
        int size = t.getSize();
        int[] iArr = this.subpartitionFreeSpaceInBytes;
        iArr[i] = iArr[i] - size;
        long[] jArr = this.subpartitionCurrentOffset;
        jArr[i] = jArr[i] + size;
        this.currentRegionGroup[i].addRegion(t.getFirstBufferIndex(), (t.getFirstBufferIndex() + t.getNumBuffers()) - 1);
    }

    private List<Tuple2<T, Long>> readRegionGroup(long j, int i) throws IOException {
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(Tuple2.of(this.fileDataIndexRegionHelper.readRegionFromFile(this.channel, j), Long.valueOf(j)));
            j += r0.getSize();
        }
        return arrayList;
    }
}
