/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileWriter;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk.SubpartitionDiskCacheManager;
import org.apache.flink.util.concurrent.FutureUtils;

class DiskCacheManager {
    private final TieredStoragePartitionId partitionId;
    private final int numSubpartitions;
    private final int maxCachedBytesBeforeFlush;
    private final PartitionFileWriter partitionFileWriter;
    private final SubpartitionDiskCacheManager[] subpartitionCacheManagers;
    private CompletableFuture<Void> hasFlushCompleted;
    private int numCachedBytesCounter;

    DiskCacheManager(TieredStoragePartitionId partitionId, int numSubpartitions, int maxCachedBytesBeforeFlush, TieredStorageMemoryManager memoryManager, PartitionFileWriter partitionFileWriter) {
        this.partitionId = partitionId;
        this.numSubpartitions = numSubpartitions;
        this.maxCachedBytesBeforeFlush = maxCachedBytesBeforeFlush;
        this.partitionFileWriter = partitionFileWriter;
        this.subpartitionCacheManagers = new SubpartitionDiskCacheManager[numSubpartitions];
        this.hasFlushCompleted = FutureUtils.completedVoidFuture();
        for (int subpartitionId = 0; subpartitionId < numSubpartitions; ++subpartitionId) {
            this.subpartitionCacheManagers[subpartitionId] = new SubpartitionDiskCacheManager();
        }
        memoryManager.listenBufferReclaimRequest(this::notifyFlushCachedBuffers);
    }

    void startSegment(int subpartitionId, int segmentIndex) {
        this.subpartitionCacheManagers[subpartitionId].startSegment(segmentIndex);
    }

    void append(Buffer buffer, int subpartitionId, boolean flush) {
        this.subpartitionCacheManagers[subpartitionId].append(buffer);
        this.increaseNumCachedBytesAndCheckFlush(buffer.readableBytes(), flush);
    }

    void appendEndOfSegmentEvent(ByteBuffer record, int subpartitionId) {
        this.subpartitionCacheManagers[subpartitionId].appendEndOfSegmentEvent(record);
        this.increaseNumCachedBytesAndCheckFlush(record.remaining(), true);
    }

    int getBufferIndex(int subpartitionId) {
        return this.subpartitionCacheManagers[subpartitionId].getBufferIndex();
    }

    void close() {
        this.forceFlushCachedBuffers();
    }

    void release() {
        Arrays.stream(this.subpartitionCacheManagers).forEach(SubpartitionDiskCacheManager::release);
        this.partitionFileWriter.release();
    }

    private void increaseNumCachedBytesAndCheckFlush(int numIncreasedCachedBytes, boolean flush) {
        this.numCachedBytesCounter += numIncreasedCachedBytes;
        if (flush && this.numCachedBytesCounter > this.maxCachedBytesBeforeFlush) {
            this.forceFlushCachedBuffers();
        }
    }

    private void notifyFlushCachedBuffers() {
        this.flushBuffers(false);
    }

    private void forceFlushCachedBuffers() {
        this.flushBuffers(true);
    }

    private synchronized void flushBuffers(boolean forceFlush) {
        if (!forceFlush && !this.hasFlushCompleted.isDone()) {
            return;
        }
        ArrayList<PartitionFileWriter.SubpartitionBufferContext> buffersToFlush = new ArrayList<PartitionFileWriter.SubpartitionBufferContext>();
        int numToWriteBuffers = this.getSubpartitionToFlushBuffers(buffersToFlush);
        if (numToWriteBuffers > 0) {
            CompletableFuture<Void> flushCompletableFuture = this.partitionFileWriter.write(this.partitionId, buffersToFlush);
            if (!forceFlush) {
                this.hasFlushCompleted = flushCompletableFuture;
            }
        }
        this.numCachedBytesCounter = 0;
    }

    private int getSubpartitionToFlushBuffers(List<PartitionFileWriter.SubpartitionBufferContext> buffersToFlush) {
        int numToWriteBuffers = 0;
        for (int subpartitionId = 0; subpartitionId < this.numSubpartitions; ++subpartitionId) {
            List<Tuple2<Buffer, Integer>> bufferWithIndexes = this.subpartitionCacheManagers[subpartitionId].removeAllBuffers();
            buffersToFlush.add(new PartitionFileWriter.SubpartitionBufferContext(subpartitionId, Collections.singletonList(new PartitionFileWriter.SegmentBufferContext(this.subpartitionCacheManagers[subpartitionId].getSegmentId(), bufferWithIndexes, false))));
            numToWriteBuffers += bufferWithIndexes.size();
        }
        return numToWriteBuffers;
    }
}

