package org.apache.flink.runtime.io.network.partition.hybrid;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Deque;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndex;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSubpartitionFileReader;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.class */
public class HsSubpartitionFileReaderImpl implements HsSubpartitionFileReader {
    private final ByteBuffer headerBuf;
    private final int subpartitionId;
    private final HsConsumerId consumerId;
    private final FileChannel dataFileChannel;
    private final HsSubpartitionConsumerInternalOperations operations;
    private final CachedRegionManager cachedRegionManager;
    private final BufferIndexManager bufferIndexManager;
    private final Consumer<HsSubpartitionFileReader> fileReaderReleaser;

    @GuardedBy("lock")
    private boolean isFailed;

    @GuardedBy("lock")
    private boolean isReleased;
    private final Deque<BufferIndexOrError> loadedBuffers = new LinkedBlockingDeque();
    private final AtomicInteger backlog = new AtomicInteger(0);
    private final Object lock = new Object();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl$BufferIndexManager.class */
    public static class BufferIndexManager {
        private final int maxBuffersReadAhead;
        private int lastLoaded = -1;
        private int lastConsumed = -1;

        BufferIndexManager(int i) {
            this.maxBuffersReadAhead = i;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void updateLastLoaded(int i) {
            Preconditions.checkState(this.lastLoaded <= i);
            this.lastLoaded = i;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void updateLastConsumed(int i) {
            this.lastConsumed = i;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public int getNextToLoad() {
            int max = Math.max(this.lastLoaded, this.lastConsumed) + 1;
            if (max <= this.lastConsumed + this.maxBuffersReadAhead) {
                return max;
            }
            return -1;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl$BufferIndexOrError.class */
    public static class BufferIndexOrError {

        @Nullable
        private final Buffer buffer;
        private final int index;

        @Nullable
        private final Throwable throwable;

        private BufferIndexOrError(@Nullable Buffer buffer, int i, @Nullable Throwable th) {
            this.buffer = buffer;
            this.index = i;
            this.throwable = th;
        }

        public Buffer.DataType getDataType() {
            return this.buffer == null ? Buffer.DataType.NONE : this.buffer.getDataType();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static BufferIndexOrError newError(Throwable th) {
            return new BufferIndexOrError(null, -1, (Throwable) Preconditions.checkNotNull(th));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static BufferIndexOrError newBuffer(Buffer buffer, int i) {
            return new BufferIndexOrError((Buffer) Preconditions.checkNotNull(buffer), i, null);
        }

        public Optional<Buffer> getBuffer() {
            return Optional.ofNullable(this.buffer);
        }

        public Optional<Throwable> getThrowable() {
            return Optional.ofNullable(this.throwable);
        }

        public int getIndex() {
            Preconditions.checkNotNull(this.buffer, "Is error, cannot get index.");
            return this.index;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl$CachedRegionManager.class */
    public static class CachedRegionManager {
        private final int subpartitionId;
        private final HsFileDataIndex dataIndex;
        private int consumingOffset;
        private int currentBufferIndex;
        private int numSkip;
        private int numReadable;
        private long offset;

        private CachedRegionManager(int i, HsFileDataIndex hsFileDataIndex) {
            this.consumingOffset = -1;
            this.subpartitionId = i;
            this.dataIndex = hsFileDataIndex;
        }

        public void updateConsumingOffset(int i) {
            this.consumingOffset = i;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public long getFileOffset(int i) {
            updateCachedRegionIfNeeded(i);
            if (this.currentBufferIndex == -1) {
                return Long.MAX_VALUE;
            }
            return this.offset;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public int getRemainingBuffersInRegion(int i) {
            updateCachedRegionIfNeeded(i);
            return this.numReadable;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void skipAll(long j) {
            this.offset = j;
            this.numSkip = 0;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Tuple2<Integer, Long> getNumSkipAndFileOffset(int i) {
            updateCachedRegionIfNeeded(i);
            Preconditions.checkState(this.numSkip >= 0, "num skip must be greater than or equal to 0");
            Preconditions.checkState(this.currentBufferIndex <= i);
            return new Tuple2<>(Integer.valueOf(this.numSkip), Long.valueOf(this.offset));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void advance(long j) {
            if (isInCachedRegion(this.currentBufferIndex + 1)) {
                this.currentBufferIndex++;
                this.numReadable--;
                this.offset += j;
            }
        }

        private void updateCachedRegionIfNeeded(int i) {
            if (isInCachedRegion(i)) {
                int i2 = i - this.currentBufferIndex;
                this.numSkip += i2;
                this.numReadable -= i2;
                this.currentBufferIndex = i;
                return;
            }
            Optional<HsFileDataIndex.ReadableRegion> readableRegion = this.dataIndex.getReadableRegion(this.subpartitionId, i, this.consumingOffset);
            if (!readableRegion.isPresent()) {
                this.currentBufferIndex = -1;
                this.numReadable = 0;
                this.numSkip = 0;
                this.offset = -1L;
                return;
            }
            HsFileDataIndex.ReadableRegion readableRegion2 = readableRegion.get();
            this.currentBufferIndex = i;
            this.numSkip = readableRegion2.numSkip;
            this.numReadable = readableRegion2.numReadable;
            this.offset = readableRegion2.offset;
        }

        private boolean isInCachedRegion(int i) {
            return i < this.currentBufferIndex + this.numReadable && i >= this.currentBufferIndex;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl$Factory.class */
    public static class Factory implements HsSubpartitionFileReader.Factory {
        public static final Factory INSTANCE = new Factory();

        private Factory() {
        }

        @Override // org.apache.flink.runtime.io.network.partition.hybrid.HsSubpartitionFileReader.Factory
        public HsSubpartitionFileReader createFileReader(int i, HsConsumerId hsConsumerId, FileChannel fileChannel, HsSubpartitionConsumerInternalOperations hsSubpartitionConsumerInternalOperations, HsFileDataIndex hsFileDataIndex, int i2, Consumer<HsSubpartitionFileReader> consumer, ByteBuffer byteBuffer) {
            return new HsSubpartitionFileReaderImpl(i, hsConsumerId, fileChannel, hsSubpartitionConsumerInternalOperations, hsFileDataIndex, i2, consumer, byteBuffer);
        }
    }

    public HsSubpartitionFileReaderImpl(int i, HsConsumerId hsConsumerId, FileChannel fileChannel, HsSubpartitionConsumerInternalOperations hsSubpartitionConsumerInternalOperations, HsFileDataIndex hsFileDataIndex, int i2, Consumer<HsSubpartitionFileReader> consumer, ByteBuffer byteBuffer) {
        this.subpartitionId = i;
        this.consumerId = hsConsumerId;
        this.dataFileChannel = fileChannel;
        this.operations = hsSubpartitionConsumerInternalOperations;
        this.headerBuf = byteBuffer;
        this.bufferIndexManager = new BufferIndexManager(i2);
        this.cachedRegionManager = new CachedRegionManager(i, hsFileDataIndex);
        this.fileReaderReleaser = consumer;
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null || getClass() != obj.getClass()) {
            return false;
        }
        HsSubpartitionFileReaderImpl hsSubpartitionFileReaderImpl = (HsSubpartitionFileReaderImpl) obj;
        return this.subpartitionId == hsSubpartitionFileReaderImpl.subpartitionId && Objects.equals(this.consumerId, hsSubpartitionFileReaderImpl.consumerId);
    }

    public int hashCode() {
        return Objects.hash(Integer.valueOf(this.subpartitionId), this.consumerId);
    }

    /* JADX WARN: Code restructure failed: missing block: B:38:0x008c, code lost:
    
        r6.add(r0);
     */
    @Override // org.apache.flink.runtime.io.network.partition.hybrid.HsSubpartitionFileReader
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void readBuffers(java.util.Queue<org.apache.flink.core.memory.MemorySegment> r6, org.apache.flink.runtime.io.network.buffer.BufferRecycler r7) throws java.io.IOException {
        /*
            Method dump skipped, instructions count: 261
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.runtime.io.network.partition.hybrid.HsSubpartitionFileReaderImpl.readBuffers(java.util.Queue, org.apache.flink.runtime.io.network.buffer.BufferRecycler):void");
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.HsSubpartitionFileReader
    public void fail(Throwable th) {
        synchronized (this.lock) {
            if (this.isFailed) {
                return;
            }
            this.isFailed = true;
            while (true) {
                BufferIndexOrError pollLast = this.loadedBuffers.pollLast();
                if (pollLast == null) {
                    this.loadedBuffers.add(BufferIndexOrError.newError(th));
                    this.operations.notifyDataAvailable();
                    return;
                } else if (pollLast.getBuffer().isPresent()) {
                    pollLast.getBuffer().get().recycleBuffer();
                    tryDecreaseBacklog(pollLast.getBuffer().get());
                }
            }
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.HsSubpartitionFileReader
    public void prepareForScheduling() {
        int consumingOffset = this.operations.getConsumingOffset(true);
        this.bufferIndexManager.updateLastConsumed(consumingOffset);
        this.cachedRegionManager.updateConsumingOffset(consumingOffset);
    }

    @Override // java.lang.Comparable
    public int compareTo(HsSubpartitionFileReader hsSubpartitionFileReader) {
        Preconditions.checkArgument(hsSubpartitionFileReader instanceof HsSubpartitionFileReaderImpl);
        return Long.compare(getNextOffsetToLoad(), ((HsSubpartitionFileReaderImpl) hsSubpartitionFileReader).getNextOffsetToLoad());
    }

    public Deque<BufferIndexOrError> getLoadedBuffers() {
        return this.loadedBuffers;
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.HsDataView
    public Optional<ResultSubpartition.BufferAndBacklog> consumeBuffer(int i, Collection<Buffer> collection) throws Throwable {
        if (!checkAndGetFirstBufferIndexOrError(i, collection).isPresent()) {
            return Optional.empty();
        }
        BufferIndexOrError bufferIndexOrError = (BufferIndexOrError) Preconditions.checkNotNull(this.loadedBuffers.poll());
        BufferIndexOrError peek = this.loadedBuffers.peek();
        Buffer.DataType dataType = peek == null ? Buffer.DataType.NONE : peek.getDataType();
        int index = bufferIndexOrError.getIndex();
        Buffer orElseThrow = bufferIndexOrError.getBuffer().orElseThrow(() -> {
            return new NullPointerException("Get a non-throwable and non-buffer bufferIndexOrError, which is not allowed");
        });
        tryDecreaseBacklog(orElseThrow);
        return Optional.of(ResultSubpartition.BufferAndBacklog.fromBufferAndLookahead(orElseThrow, dataType, this.backlog.get(), index));
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.HsDataView
    public Buffer.DataType peekNextToConsumeDataType(int i, Collection<Buffer> collection) {
        Buffer.DataType dataType = Buffer.DataType.NONE;
        try {
            dataType = (Buffer.DataType) checkAndGetFirstBufferIndexOrError(i, collection).map((v0) -> {
                return v0.getDataType();
            }).orElse(Buffer.DataType.NONE);
        } catch (Throwable th) {
            ExceptionUtils.rethrow(th);
        }
        return dataType;
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.HsDataView
    public void releaseDataView() {
        ArrayDeque arrayDeque = new ArrayDeque();
        synchronized (this.lock) {
            this.isReleased = true;
            while (!this.loadedBuffers.isEmpty()) {
                BufferIndexOrError poll = this.loadedBuffers.poll();
                if (poll.getBuffer().isPresent()) {
                    tryDecreaseBacklog(poll.getBuffer().get());
                    arrayDeque.add(poll.getBuffer().get());
                }
            }
        }
        arrayDeque.forEach((v0) -> {
            v0.recycleBuffer();
        });
        this.fileReaderReleaser.accept(this);
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.HsDataView
    public int getBacklog() {
        return this.backlog.get();
    }

    private Optional<BufferIndexOrError> checkAndGetFirstBufferIndexOrError(int i, Collection<Buffer> collection) throws Throwable {
        BufferIndexOrError peek = this.loadedBuffers.peek();
        while (peek != null) {
            if (peek.getThrowable().isPresent()) {
                throw peek.getThrowable().get();
            }
            if (peek.getIndex() == i) {
                break;
            }
            if (peek.getIndex() > i) {
                return Optional.empty();
            }
            if (peek.getIndex() < i) {
                Buffer buffer = ((BufferIndexOrError) Preconditions.checkNotNull(this.loadedBuffers.poll())).buffer;
                tryDecreaseBacklog((Buffer) Preconditions.checkNotNull(buffer));
                collection.add(buffer);
                peek = this.loadedBuffers.peek();
            }
        }
        return Optional.ofNullable(peek);
    }

    private void tryIncreaseBacklog(Buffer buffer) {
        if (buffer.isBuffer()) {
            this.backlog.getAndIncrement();
        }
    }

    private void tryDecreaseBacklog(Buffer buffer) {
        if (buffer.isBuffer()) {
            this.backlog.getAndDecrement();
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void moveFileOffsetToBuffer(int i) throws IOException {
        Tuple2 numSkipAndFileOffset = this.cachedRegionManager.getNumSkipAndFileOffset(i);
        this.dataFileChannel.position(((Long) numSkipAndFileOffset.f1).longValue());
        for (int i2 = 0; i2 < ((Integer) numSkipAndFileOffset.f0).intValue(); i2++) {
            BufferReaderWriterUtil.positionToNextBuffer(this.dataFileChannel, this.headerBuf);
        }
        this.cachedRegionManager.skipAll(this.dataFileChannel.position());
    }

    private long getNextOffsetToLoad() {
        int nextToLoad = this.bufferIndexManager.getNextToLoad();
        if (nextToLoad < 0) {
            return Long.MAX_VALUE;
        }
        return this.cachedRegionManager.getFileOffset(nextToLoad);
    }
}
