package org.apache.flink.runtime.io.network.partition.hybrid;

import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.flink.util.function.ThrowingRunnable;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.class */
public class HsSubpartitionMemoryDataManager {
    private final int targetChannel;
    private final int bufferSize;
    private final HsMemoryDataManagerOperation memoryDataManagerOperation;
    private int finishedBufferIndex;
    private final Lock resultPartitionLock;

    @Nullable
    private final BufferCompressor bufferCompressor;

    @Nullable
    private HsOutputMetrics outputMetrics;
    private final Queue<BufferBuilder> unfinishedBuffers = new LinkedList();

    @GuardedBy("subpartitionLock")
    private final Deque<HsBufferContext> allBuffers = new LinkedList();

    @GuardedBy("subpartitionLock")
    private final Map<Integer, HsBufferContext> bufferIndexToContexts = new HashMap();
    private final ReentrantReadWriteLock subpartitionLock = new ReentrantReadWriteLock();

    @GuardedBy("subpartitionLock")
    private final Map<HsConsumerId, HsSubpartitionConsumerMemoryDataManager> consumerMap = new HashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    public HsSubpartitionMemoryDataManager(int i, int i2, Lock lock, @Nullable BufferCompressor bufferCompressor, HsMemoryDataManagerOperation hsMemoryDataManagerOperation) {
        this.targetChannel = i;
        this.bufferSize = i2;
        this.resultPartitionLock = lock;
        this.memoryDataManagerOperation = hsMemoryDataManagerOperation;
        this.bufferCompressor = bufferCompressor;
    }

    public void append(ByteBuffer byteBuffer, Buffer.DataType dataType) throws InterruptedException {
        if (dataType.isEvent()) {
            writeEvent(byteBuffer, dataType);
        } else {
            writeRecord(byteBuffer, dataType);
        }
    }

    public Deque<BufferIndexAndChannel> getBuffersSatisfyStatus(HsSpillingInfoProvider.SpillStatus spillStatus, HsSpillingInfoProvider.ConsumeStatusWithId consumeStatusWithId) {
        return (Deque) callWithLock(() -> {
            ArrayDeque arrayDeque = new ArrayDeque();
            this.allBuffers.forEach(hsBufferContext -> {
                if (isBufferSatisfyStatus(hsBufferContext, spillStatus, consumeStatusWithId)) {
                    arrayDeque.add(hsBufferContext.getBufferIndexAndChannel());
                }
            });
            return arrayDeque;
        });
    }

    public List<BufferWithIdentity> spillSubpartitionBuffers(List<BufferIndexAndChannel> list, CompletableFuture<Void> completableFuture) {
        return (List) callWithLock(() -> {
            return (List) list.stream().map(bufferIndexAndChannel -> {
                int bufferIndex = bufferIndexAndChannel.getBufferIndex();
                return startSpillingBuffer(bufferIndex, completableFuture).map(hsBufferContext -> {
                    return new BufferWithIdentity(hsBufferContext.getBuffer(), bufferIndex, this.targetChannel);
                });
            }).filter((v0) -> {
                return v0.isPresent();
            }).map((v0) -> {
                return v0.get();
            }).collect(Collectors.toList());
        });
    }

    public void releaseSubpartitionBuffers(List<BufferIndexAndChannel> list) {
        runWithLock(() -> {
            list.forEach(bufferIndexAndChannel -> {
                int bufferIndex = bufferIndexAndChannel.getBufferIndex();
                HsBufferContext hsBufferContext = this.bufferIndexToContexts.get(Integer.valueOf(bufferIndex));
                if (hsBufferContext != null) {
                    checkAndMarkBufferReadable(hsBufferContext);
                    releaseBuffer(bufferIndex);
                }
            });
        });
    }

    public void setOutputMetrics(HsOutputMetrics hsOutputMetrics) {
        this.outputMetrics = (HsOutputMetrics) Preconditions.checkNotNull(hsOutputMetrics);
    }

    public HsSubpartitionConsumerMemoryDataManager registerNewConsumer(HsConsumerId hsConsumerId) {
        return (HsSubpartitionConsumerMemoryDataManager) callWithLock(() -> {
            Preconditions.checkState(!this.consumerMap.containsKey(hsConsumerId));
            HsSubpartitionConsumerMemoryDataManager hsSubpartitionConsumerMemoryDataManager = new HsSubpartitionConsumerMemoryDataManager(this.resultPartitionLock, this.subpartitionLock.readLock(), this.targetChannel, hsConsumerId, this.memoryDataManagerOperation);
            hsSubpartitionConsumerMemoryDataManager.addInitialBuffers(this.allBuffers);
            this.consumerMap.put(hsConsumerId, hsSubpartitionConsumerMemoryDataManager);
            return hsSubpartitionConsumerMemoryDataManager;
        });
    }

    public void releaseConsumer(HsConsumerId hsConsumerId) {
        runWithLock(() -> {
            Preconditions.checkNotNull(this.consumerMap.remove(hsConsumerId));
        });
    }

    private void writeEvent(ByteBuffer byteBuffer, Buffer.DataType dataType) {
        Preconditions.checkArgument(dataType.isEvent());
        finishCurrentWritingBufferIfNotEmpty();
        MemorySegment wrap = MemorySegmentFactory.wrap(byteBuffer.array());
        addFinishedBuffer(new HsBufferContext(new NetworkBuffer(wrap, FreeingBufferRecycler.INSTANCE, dataType, wrap.size()), this.finishedBufferIndex, this.targetChannel));
        this.memoryDataManagerOperation.onBufferFinished();
    }

    private void writeRecord(ByteBuffer byteBuffer, Buffer.DataType dataType) throws InterruptedException {
        Preconditions.checkArgument(!dataType.isEvent());
        ensureCapacityForRecord(byteBuffer);
        writeRecord(byteBuffer);
    }

    private void ensureCapacityForRecord(ByteBuffer byteBuffer) throws InterruptedException {
        int remaining = byteBuffer.remaining();
        int intValue = ((Integer) Optional.ofNullable(this.unfinishedBuffers.peek()).map(bufferBuilder -> {
            return Integer.valueOf(bufferBuilder.getWritableBytes() + (this.bufferSize * (this.unfinishedBuffers.size() - 1)));
        }).orElse(0)).intValue();
        while (true) {
            int i = intValue;
            if (i >= remaining) {
                return;
            }
            this.unfinishedBuffers.add(this.memoryDataManagerOperation.requestBufferFromPool());
            intValue = i + this.bufferSize;
        }
    }

    private void writeRecord(ByteBuffer byteBuffer) {
        while (byteBuffer.hasRemaining()) {
            BufferBuilder bufferBuilder = (BufferBuilder) Preconditions.checkNotNull(this.unfinishedBuffers.peek(), "Expect enough capacity for the record.");
            bufferBuilder.append(byteBuffer);
            if (bufferBuilder.isFull()) {
                finishCurrentWritingBuffer();
            }
        }
    }

    private void finishCurrentWritingBufferIfNotEmpty() {
        BufferBuilder peek = this.unfinishedBuffers.peek();
        if (peek == null || peek.getWritableBytes() == this.bufferSize) {
            return;
        }
        finishCurrentWritingBuffer();
    }

    private void finishCurrentWritingBuffer() {
        BufferBuilder poll = this.unfinishedBuffers.poll();
        if (poll == null) {
            return;
        }
        poll.finish();
        BufferConsumer createBufferConsumerFromBeginning = poll.createBufferConsumerFromBeginning();
        Buffer build = createBufferConsumerFromBeginning.build();
        poll.close();
        createBufferConsumerFromBeginning.close();
        addFinishedBuffer(new HsBufferContext(compressBuffersIfPossible(build), this.finishedBufferIndex, this.targetChannel));
        this.memoryDataManagerOperation.onBufferFinished();
    }

    private Buffer compressBuffersIfPossible(Buffer buffer) {
        return !canBeCompressed(buffer) ? buffer : ((BufferCompressor) Preconditions.checkNotNull(this.bufferCompressor)).compressToOriginalBuffer(buffer);
    }

    private boolean canBeCompressed(Buffer buffer) {
        return this.bufferCompressor != null && buffer.isBuffer() && buffer.readableBytes() > 0;
    }

    private void addFinishedBuffer(HsBufferContext hsBufferContext) {
        this.finishedBufferIndex++;
        ArrayList arrayList = new ArrayList(this.consumerMap.size());
        runWithLock(() -> {
            this.allBuffers.add(hsBufferContext);
            this.bufferIndexToContexts.put(Integer.valueOf(hsBufferContext.getBufferIndexAndChannel().getBufferIndex()), hsBufferContext);
            for (Map.Entry<HsConsumerId, HsSubpartitionConsumerMemoryDataManager> entry : this.consumerMap.entrySet()) {
                if (entry.getValue().addBuffer(hsBufferContext)) {
                    arrayList.add(entry.getKey());
                }
            }
            updateStatistics(hsBufferContext.getBuffer());
        });
        this.memoryDataManagerOperation.onDataAvailable(this.targetChannel, arrayList);
    }

    @GuardedBy("subpartitionLock")
    private void trimHeadingReleasedBuffers(Deque<HsBufferContext> deque) {
        while (!deque.isEmpty() && deque.peekFirst().isReleased()) {
            deque.removeFirst();
        }
    }

    @GuardedBy("subpartitionLock")
    private void releaseBuffer(int i) {
        HsBufferContext remove = this.bufferIndexToContexts.remove(Integer.valueOf(i));
        if (remove == null) {
            return;
        }
        remove.release();
        trimHeadingReleasedBuffers(this.allBuffers);
    }

    @GuardedBy("subpartitionLock")
    private Optional<HsBufferContext> startSpillingBuffer(int i, CompletableFuture<Void> completableFuture) {
        HsBufferContext hsBufferContext = this.bufferIndexToContexts.get(Integer.valueOf(i));
        if (hsBufferContext != null && hsBufferContext.startSpilling(completableFuture)) {
            return Optional.of(hsBufferContext);
        }
        return Optional.empty();
    }

    @GuardedBy("subpartitionLock")
    private void checkAndMarkBufferReadable(HsBufferContext hsBufferContext) {
        if (isBufferSatisfyStatus(hsBufferContext, HsSpillingInfoProvider.SpillStatus.SPILL, HsSpillingInfoProvider.ConsumeStatusWithId.ALL_ANY)) {
            hsBufferContext.getSpilledFuture().orElseThrow(() -> {
                return new IllegalStateException("Buffer in spill status should already set spilled future.");
            }).thenRun(() -> {
                BufferIndexAndChannel bufferIndexAndChannel = hsBufferContext.getBufferIndexAndChannel();
                this.memoryDataManagerOperation.markBufferReleasedFromFile(bufferIndexAndChannel.getChannel(), bufferIndexAndChannel.getBufferIndex());
            });
        }
    }

    @GuardedBy("subpartitionLock")
    private boolean isBufferSatisfyStatus(HsBufferContext hsBufferContext, HsSpillingInfoProvider.SpillStatus spillStatus, HsSpillingInfoProvider.ConsumeStatusWithId consumeStatusWithId) {
        if (hsBufferContext.isReleased()) {
            return false;
        }
        boolean z = true;
        switch (spillStatus) {
            case NOT_SPILL:
                z = !hsBufferContext.isSpillStarted();
                break;
            case SPILL:
                z = hsBufferContext.isSpillStarted();
                break;
        }
        switch (consumeStatusWithId.status) {
            case NOT_CONSUMED:
                z &= !hsBufferContext.isConsumed(consumeStatusWithId.consumerId);
                break;
            case CONSUMED:
                z &= hsBufferContext.isConsumed(consumeStatusWithId.consumerId);
                break;
        }
        return z;
    }

    private void updateStatistics(Buffer buffer) {
        ((HsOutputMetrics) Preconditions.checkNotNull(this.outputMetrics)).getNumBuffersOut().inc();
        ((HsOutputMetrics) Preconditions.checkNotNull(this.outputMetrics)).getNumBytesOut().inc(buffer.readableBytes());
    }

    private <E extends Exception> void runWithLock(ThrowingRunnable<E> throwingRunnable) throws Exception {
        try {
            this.resultPartitionLock.lock();
            this.subpartitionLock.writeLock().lock();
            throwingRunnable.run();
        } finally {
            this.subpartitionLock.writeLock().unlock();
            this.resultPartitionLock.unlock();
        }
    }

    private <R, E extends Exception> R callWithLock(SupplierWithException<R, E> supplierWithException) throws Exception {
        try {
            this.resultPartitionLock.lock();
            this.subpartitionLock.writeLock().lock();
            return (R) supplierWithException.get();
        } finally {
            this.subpartitionLock.writeLock().unlock();
            this.resultPartitionLock.unlock();
        }
    }
}
