package org.apache.flink.runtime.io.network.partition.hybrid;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategy;
import org.apache.flink.util.function.SupplierWithException;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.class */
public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryDataManagerOperation {
    private final int numSubpartitions;
    private final HsSubpartitionMemoryDataManager[] subpartitionMemoryDataManagers;
    private final HsMemoryDataSpiller spiller;
    private final HsSpillingStrategy spillStrategy;
    private final HsFileDataIndex fileDataIndex;
    private final BufferPool bufferPool;
    private final Lock lock;
    private final AtomicInteger numRequestedBuffers = new AtomicInteger(0);
    private final AtomicInteger numUnSpillBuffers = new AtomicInteger(0);

    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager$BufferAndNextDataType.class */
    public static class BufferAndNextDataType {
        private final Buffer buffer;
        private final Buffer.DataType nextDataType;

        public BufferAndNextDataType(Buffer buffer, Buffer.DataType dataType) {
            this.buffer = buffer;
            this.nextDataType = dataType;
        }

        public Buffer getBuffer() {
            return this.buffer;
        }

        public Buffer.DataType getNextDataType() {
            return this.nextDataType;
        }
    }

    public HsMemoryDataManager(int i, int i2, BufferPool bufferPool, HsSpillingStrategy hsSpillingStrategy, HsFileDataIndex hsFileDataIndex, FileChannel fileChannel) {
        this.numSubpartitions = i;
        this.bufferPool = bufferPool;
        this.spiller = new HsMemoryDataSpiller(fileChannel);
        this.spillStrategy = hsSpillingStrategy;
        this.fileDataIndex = hsFileDataIndex;
        this.subpartitionMemoryDataManagers = new HsSubpartitionMemoryDataManager[i];
        ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock(true);
        this.lock = reentrantReadWriteLock.writeLock();
        for (int i3 = 0; i3 < i; i3++) {
            this.subpartitionMemoryDataManagers[i3] = new HsSubpartitionMemoryDataManager(i3, i2, reentrantReadWriteLock.readLock(), this);
        }
    }

    public void append(ByteBuffer byteBuffer, int i, Buffer.DataType dataType) throws IOException {
        try {
            getSubpartitionMemoryDataManager(i).append(byteBuffer, dataType);
        } catch (InterruptedException e) {
            throw new IOException(e);
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider
    public int getPoolSize() {
        return this.bufferPool.getNumBuffers();
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider
    public int getNumSubpartitions() {
        return this.numSubpartitions;
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider
    public int getNumTotalRequestedBuffers() {
        return this.numRequestedBuffers.get();
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider
    public int getNumTotalUnSpillBuffers() {
        return this.numUnSpillBuffers.get();
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider
    public Deque<BufferIndexAndChannel> getBuffersInOrder(int i, HsSpillingInfoProvider.SpillStatus spillStatus, HsSpillingInfoProvider.ConsumeStatus consumeStatus) {
        return getSubpartitionMemoryDataManager(i).getBuffersSatisfyStatus(spillStatus, consumeStatus);
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider
    public List<Integer> getNextBufferIndexToConsume() {
        return Collections.emptyList();
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.HsMemoryDataManagerOperation
    public void markBufferReadableFromFile(int i, int i2) {
        this.fileDataIndex.markBufferReadable(i, i2);
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.HsMemoryDataManagerOperation
    public BufferBuilder requestBufferFromPool() throws InterruptedException {
        MemorySegment requestMemorySegmentBlocking = this.bufferPool.requestMemorySegmentBlocking();
        handleDecision(this.spillStrategy.onMemoryUsageChanged(this.numRequestedBuffers.incrementAndGet(), getPoolSize()));
        return new BufferBuilder(requestMemorySegmentBlocking, this::recycleBuffer);
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.HsMemoryDataManagerOperation
    public void onBufferConsumed(BufferIndexAndChannel bufferIndexAndChannel) {
        handleDecision(this.spillStrategy.onBufferConsumed(bufferIndexAndChannel));
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.HsMemoryDataManagerOperation
    public void onBufferFinished() {
        handleDecision(this.spillStrategy.onBufferFinished(this.numUnSpillBuffers.incrementAndGet()));
    }

    private void handleDecision(Optional<HsSpillingStrategy.Decision> optional) {
        HsSpillingStrategy.Decision orElseGet = optional.orElseGet(() -> {
            return (HsSpillingStrategy.Decision) callWithLock(() -> {
                return this.spillStrategy.decideActionWithGlobalInfo(this);
            });
        });
        if (!orElseGet.getBufferToSpill().isEmpty()) {
            spillBuffers(orElseGet.getBufferToSpill());
        }
        if (orElseGet.getBufferToRelease().isEmpty()) {
            return;
        }
        releaseBuffers(orElseGet.getBufferToRelease());
    }

    private void spillBuffers(Map<Integer, List<BufferIndexAndChannel>> map) {
        CompletableFuture completableFuture = new CompletableFuture();
        ArrayList arrayList = new ArrayList();
        map.forEach((num, list) -> {
            arrayList.addAll(getSubpartitionMemoryDataManager(num.intValue()).spillSubpartitionBuffers(list, completableFuture));
            this.numUnSpillBuffers.getAndAdd(-list.size());
        });
        this.spiller.spillAsync(arrayList).thenAccept(list2 -> {
            this.fileDataIndex.addBuffers(list2);
            completableFuture.complete(null);
        });
    }

    private void releaseBuffers(Map<Integer, List<BufferIndexAndChannel>> map) {
        map.forEach((num, list) -> {
            getSubpartitionMemoryDataManager(num.intValue()).releaseSubpartitionBuffers(list);
        });
    }

    private HsSubpartitionMemoryDataManager getSubpartitionMemoryDataManager(int i) {
        return this.subpartitionMemoryDataManagers[i];
    }

    private void recycleBuffer(MemorySegment memorySegment) {
        this.numRequestedBuffers.decrementAndGet();
        this.bufferPool.recycle(memorySegment);
    }

    public <T, R extends Exception> T callWithLock(SupplierWithException<T, R> supplierWithException) throws Exception {
        try {
            this.lock.lock();
            return (T) supplierWithException.get();
        } finally {
            this.lock.unlock();
        }
    }
}
