package org.apache.flink.runtime.io.network.partition.hybrid;

import java.util.Collection;
import java.util.Deque;
import java.util.LinkedList;
import java.util.Optional;
import java.util.concurrent.locks.Lock;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionConsumerMemoryDataManager.class */
public class HsSubpartitionConsumerMemoryDataManager implements HsDataView {

    @GuardedBy("consumerLock")
    private final Deque<HsBufferContext> unConsumedBuffers = new LinkedList();
    private final Lock consumerLock;
    private final Lock resultPartitionLock;
    private final HsConsumerId consumerId;
    private final int subpartitionId;
    private final HsMemoryDataManagerOperation memoryDataManagerOperation;

    public HsSubpartitionConsumerMemoryDataManager(Lock lock, Lock lock2, int i, HsConsumerId hsConsumerId, HsMemoryDataManagerOperation hsMemoryDataManagerOperation) {
        this.resultPartitionLock = lock;
        this.consumerLock = lock2;
        this.subpartitionId = i;
        this.consumerId = hsConsumerId;
        this.memoryDataManagerOperation = hsMemoryDataManagerOperation;
    }

    @GuardedBy("consumerLock")
    public void addInitialBuffers(Deque<HsBufferContext> deque) {
        this.unConsumedBuffers.addAll(deque);
    }

    @GuardedBy("consumerLock")
    public boolean addBuffer(HsBufferContext hsBufferContext) {
        this.unConsumedBuffers.add(hsBufferContext);
        trimHeadingReleasedBuffers();
        return this.unConsumedBuffers.size() <= 1;
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.HsDataView
    public Optional<ResultSubpartition.BufferAndBacklog> consumeBuffer(int i, Collection<Buffer> collection) {
        Optional optional = (Optional) callWithLock(() -> {
            if (!checkFirstUnConsumedBufferIndex(i)) {
                return Optional.empty();
            }
            HsBufferContext hsBufferContext = (HsBufferContext) Preconditions.checkNotNull(this.unConsumedBuffers.pollFirst());
            hsBufferContext.consumed(this.consumerId);
            return Optional.of(Tuple2.of(hsBufferContext, peekNextToConsumeDataTypeInternal(i + 1)));
        });
        optional.ifPresent(tuple2 -> {
            this.memoryDataManagerOperation.onBufferConsumed(((HsBufferContext) tuple2.f0).getBufferIndexAndChannel());
        });
        return optional.map(tuple22 -> {
            return new ResultSubpartition.BufferAndBacklog(((HsBufferContext) tuple22.f0).getBuffer().readOnlySlice(), getBacklog(), (Buffer.DataType) tuple22.f1, i);
        });
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.HsDataView
    public Buffer.DataType peekNextToConsumeDataType(int i, Collection<Buffer> collection) {
        return (Buffer.DataType) callWithLock(() -> {
            return peekNextToConsumeDataTypeInternal(i);
        });
    }

    @GuardedBy("consumerLock")
    private Buffer.DataType peekNextToConsumeDataTypeInternal(int i) {
        return checkFirstUnConsumedBufferIndex(i) ? ((HsBufferContext) Preconditions.checkNotNull(this.unConsumedBuffers.peekFirst())).getBuffer().getDataType() : Buffer.DataType.NONE;
    }

    @GuardedBy("consumerLock")
    private boolean checkFirstUnConsumedBufferIndex(int i) {
        trimHeadingReleasedBuffers();
        return !this.unConsumedBuffers.isEmpty() && this.unConsumedBuffers.peekFirst().getBufferIndexAndChannel().getBufferIndex() == i;
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.HsDataView
    public int getBacklog() {
        return this.unConsumedBuffers.size();
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.HsDataView
    public void releaseDataView() {
        this.memoryDataManagerOperation.onConsumerReleased(this.subpartitionId, this.consumerId);
    }

    @GuardedBy("consumerLock")
    private void trimHeadingReleasedBuffers() {
        while (!this.unConsumedBuffers.isEmpty() && this.unConsumedBuffers.peekFirst().isReleased()) {
            this.unConsumedBuffers.removeFirst();
        }
    }

    private <R, E extends Exception> R callWithLock(SupplierWithException<R, E> supplierWithException) throws Exception {
        try {
            this.resultPartitionLock.lock();
            this.consumerLock.lock();
            return (R) supplierWithException.get();
        } finally {
            this.consumerLock.unlock();
            this.resultPartitionLock.unlock();
        }
    }
}
