package org.apache.flink.state.forst.fs.cache;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.Semaphore;
import org.apache.flink.core.fs.ByteBufferReadable;
import org.apache.flink.core.fs.FSDataInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/state/forst/fs/cache/CachedDataInputStream.class */
public class CachedDataInputStream extends FSDataInputStream implements ByteBufferReadable {
    private static final Logger LOG = LoggerFactory.getLogger(CachedDataInputStream.class);
    private final FileCacheEntry cacheEntry;
    private volatile FSDataInputStream fsdis;
    private volatile long position;
    private final FSDataInputStream originalStream;
    private volatile StreamStatus streamStatus = StreamStatus.CACHED_OPEN;
    private Semaphore semaphore = new Semaphore(0);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/state/forst/fs/cache/CachedDataInputStream$StreamStatus.class */
    public enum StreamStatus {
        CACHED_OPEN,
        CACHED_CLOSED,
        ORIGINAL
    }

    public CachedDataInputStream(FileCacheEntry fileCacheEntry, FSDataInputStream fSDataInputStream, FSDataInputStream fSDataInputStream2) {
        this.cacheEntry = fileCacheEntry;
        this.fsdis = fSDataInputStream;
        this.originalStream = fSDataInputStream2;
    }

    private FSDataInputStream getStream() throws IOException {
        if (this.streamStatus == StreamStatus.CACHED_OPEN && this.cacheEntry.tryRetain() > 0) {
            return this.fsdis;
        }
        if (this.streamStatus == StreamStatus.ORIGINAL) {
            return this.originalStream;
        }
        try {
            this.semaphore.acquire(1);
            this.originalStream.seek(this.position);
            this.position = -1L;
            this.streamStatus = StreamStatus.ORIGINAL;
            return this.originalStream;
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private void closeStream() throws IOException {
        if (this.streamStatus == StreamStatus.CACHED_OPEN) {
            this.streamStatus = StreamStatus.CACHED_CLOSED;
            this.position = this.fsdis.getPos();
            this.fsdis.close();
            this.fsdis = null;
            this.semaphore.release(1);
        }
    }

    private void finish() {
        if (this.streamStatus == StreamStatus.CACHED_OPEN) {
            this.cacheEntry.release();
        }
    }

    public void seek(long j) throws IOException {
        try {
            getStream().seek(j);
        } finally {
            finish();
        }
    }

    public long getPos() throws IOException {
        try {
            return getStream().getPos();
        } finally {
            finish();
        }
    }

    public int read() throws IOException {
        try {
            return getStream().read();
        } finally {
            finish();
        }
    }

    public int read(byte[] bArr) throws IOException {
        try {
            return getStream().read(bArr);
        } finally {
            finish();
        }
    }

    public int read(byte[] bArr, int i, int i2) throws IOException {
        try {
            int read = getStream().read(bArr, i, i2);
            finish();
            return read;
        } catch (Throwable th) {
            finish();
            throw th;
        }
    }

    public long skip(long j) throws IOException {
        try {
            long skip = getStream().skip(j);
            finish();
            return skip;
        } catch (Throwable th) {
            finish();
            throw th;
        }
    }

    public int available() throws IOException {
        try {
            return getStream().available();
        } finally {
            finish();
        }
    }

    public void close() throws IOException {
        closeStream();
    }

    public void mark(int i) {
        try {
            getStream().mark(i);
        } catch (Exception e) {
            LOG.warn("Mark error.", e);
        } finally {
            finish();
        }
    }

    public void reset() throws IOException {
        try {
            getStream().reset();
        } finally {
            finish();
        }
    }

    public boolean markSupported() {
        try {
            return getStream().markSupported();
        } catch (IOException e) {
            LOG.warn("MarkSupported error.", e);
            return false;
        } finally {
            finish();
        }
    }

    public int read(ByteBuffer byteBuffer) throws IOException {
        if (byteBuffer == null) {
            throw new NullPointerException();
        }
        if (byteBuffer.remaining() == 0) {
            return 0;
        }
        try {
            ByteBufferReadable stream = getStream();
            return stream instanceof ByteBufferReadable ? stream.read(byteBuffer) : readFullyFromFSDataInputStream(stream, byteBuffer);
        } finally {
            finish();
        }
    }

    public int read(long j, ByteBuffer byteBuffer) throws IOException {
        try {
            ByteBufferReadable stream = getStream();
            if (stream instanceof ByteBufferReadable) {
                int read = stream.read(j, byteBuffer);
                finish();
                return read;
            }
            stream.seek(j);
            int readFullyFromFSDataInputStream = readFullyFromFSDataInputStream(stream, byteBuffer);
            finish();
            return readFullyFromFSDataInputStream;
        } catch (Throwable th) {
            finish();
            throw th;
        }
    }

    private static int readFullyFromFSDataInputStream(FSDataInputStream fSDataInputStream, ByteBuffer byteBuffer) throws IOException {
        int read;
        int read2 = fSDataInputStream.read();
        if (read2 == -1) {
            return -1;
        }
        byteBuffer.put((byte) read2);
        int i = 1;
        int remaining = byteBuffer.remaining() + 1;
        while (i < remaining && (read = fSDataInputStream.read()) != -1) {
            byteBuffer.put((byte) read);
            i++;
        }
        return i;
    }
}
