package org.apache.flink.state.forst.fs;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.core.fs.ByteBufferReadable;
import org.apache.flink.core.fs.FSDataInputStream;

@Experimental
/* loaded from: input_file:org/apache/flink/state/forst/fs/ByteBufferReadableFSDataInputStream.class */
public class ByteBufferReadableFSDataInputStream extends FSDataInputStream {
    private final FSDataInputStream originalInputStream;
    private final Queue<FSDataInputStream> readInputStreamPool;
    private final Callable<FSDataInputStream> inputStreamBuilder;
    private final long totalFileSize;

    public ByteBufferReadableFSDataInputStream(Callable<FSDataInputStream> callable, int i, long j) throws IOException {
        try {
            this.originalInputStream = callable.call();
            this.inputStreamBuilder = callable;
            this.readInputStreamPool = new LinkedBlockingQueue(i);
            this.totalFileSize = j;
        } catch (Exception e) {
            throw new IOException("Exception when build original input stream", e);
        }
    }

    public int readFully(ByteBuffer byteBuffer) throws IOException {
        if (byteBuffer == null) {
            throw new NullPointerException();
        }
        if (byteBuffer.remaining() == 0) {
            return 0;
        }
        return this.originalInputStream instanceof ByteBufferReadable ? this.originalInputStream.read(byteBuffer) : readFullyFromFSDataInputStream(this.originalInputStream, byteBuffer);
    }

    public int readFully(long j, ByteBuffer byteBuffer) throws Exception {
        int readFullyFromFSDataInputStream;
        if (byteBuffer == null) {
            throw new NullPointerException();
        }
        if (j >= this.totalFileSize) {
            throw new IllegalArgumentException(String.format("position [%s] is larger than or equals to totalFileSize [%s]", Long.valueOf(j), Long.valueOf(this.totalFileSize)));
        }
        byteBuffer.limit(Math.min(byteBuffer.limit(), (int) Math.min((this.totalFileSize - j) + byteBuffer.position(), 2147483647L)));
        if (byteBuffer.remaining() == 0) {
            return 0;
        }
        FSDataInputStream poll = this.readInputStreamPool.poll();
        if (poll == null) {
            poll = this.inputStreamBuilder.call();
        }
        if (poll instanceof ByteBufferReadable) {
            readFullyFromFSDataInputStream = ((ByteBufferReadable) poll).read(j, byteBuffer);
        } else {
            poll.seek(j);
            readFullyFromFSDataInputStream = readFullyFromFSDataInputStream(poll, byteBuffer);
        }
        try {
            if (!this.readInputStreamPool.offer(poll)) {
                poll.close();
            }
            return readFullyFromFSDataInputStream;
        } catch (Exception e) {
            poll.close();
            throw e;
        }
    }

    private int readFullyFromFSDataInputStream(FSDataInputStream fSDataInputStream, ByteBuffer byteBuffer) throws IOException {
        int read;
        int read2 = fSDataInputStream.read();
        if (read2 == -1) {
            return -1;
        }
        byteBuffer.put((byte) read2);
        int i = 1;
        int remaining = byteBuffer.remaining() + 1;
        while (i < remaining && (read = fSDataInputStream.read()) != -1) {
            byteBuffer.put((byte) read);
            i++;
        }
        return i;
    }

    public void seek(long j) throws IOException {
        this.originalInputStream.seek(j);
    }

    public long getPos() throws IOException {
        return this.originalInputStream.getPos();
    }

    public int read() throws IOException {
        return this.originalInputStream.read();
    }

    public int read(byte[] bArr) throws IOException {
        return this.originalInputStream.read(bArr);
    }

    public int read(byte[] bArr, int i, int i2) throws IOException {
        return this.originalInputStream.read(bArr, i, i2);
    }

    public long skip(long j) throws IOException {
        long pos = getPos();
        seek(pos + j);
        return getPos() - pos;
    }

    public int available() throws IOException {
        return this.originalInputStream.available();
    }

    public void close() throws IOException {
        this.originalInputStream.close();
        Iterator<FSDataInputStream> it = this.readInputStreamPool.iterator();
        while (it.hasNext()) {
            it.next().close();
        }
    }

    public synchronized void mark(int i) {
        this.originalInputStream.mark(i);
    }

    public synchronized void reset() throws IOException {
        this.originalInputStream.reset();
    }

    public boolean markSupported() {
        return this.originalInputStream.markSupported();
    }
}
