package alluxio.client.block.stream;

import alluxio.Seekable;
import alluxio.client.BoundedStream;
import alluxio.client.CanUnbuffer;
import alluxio.client.PositionedReadable;
import alluxio.client.ReadType;
import alluxio.client.block.stream.BlockWorkerDataReader;
import alluxio.client.block.stream.DataReader;
import alluxio.client.block.stream.GrpcDataReader;
import alluxio.client.block.stream.LocalFileDataReader;
import alluxio.client.block.stream.SharedGrpcDataReader;
import alluxio.client.file.FileSystemContext;
import alluxio.client.file.options.InStreamOptions;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.exception.PreconditionMessage;
import alluxio.exception.status.NotFoundException;
import alluxio.exception.status.OutOfRangeException;
import alluxio.grpc.ReadRequest;
import alluxio.network.protocol.databuffer.DataBuffer;
import alluxio.proto.dataserver.Protocol;
import alluxio.shaded.client.com.google.common.annotations.VisibleForTesting;
import alluxio.shaded.client.com.google.common.base.Preconditions;
import alluxio.shaded.client.javax.annotation.concurrent.NotThreadSafe;
import alluxio.util.LogUtils;
import alluxio.util.io.BufferUtils;
import alluxio.util.network.NettyUtils;
import alluxio.util.network.NetworkAddressUtils;
import alluxio.wire.BlockInfo;
import alluxio.wire.WorkerNetAddress;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:alluxio/client/block/stream/BlockInStream.class */
public class BlockInStream extends InputStream implements BoundedStream, Seekable, PositionedReadable, CanUnbuffer {
    private static final Logger LOG = LoggerFactory.getLogger(BlockInStream.class);
    private final WorkerNetAddress mAddress;
    private final BlockInStreamSource mInStreamSource;
    private final long mId;
    private final long mLength;
    protected DataBuffer mCurrentChunk;
    protected DataReader mDataReader;
    private final DataReader.Factory mDataReaderFactory;
    private final byte[] mSingleByte = new byte[1];
    private long mPos = 0;
    private boolean mClosed = false;
    private boolean mEOF = false;

    /* loaded from: input_file:alluxio/client/block/stream/BlockInStream$BlockInStreamSource.class */
    public enum BlockInStreamSource {
        PROCESS_LOCAL,
        NODE_LOCAL,
        REMOTE,
        UFS
    }

    public static BlockInStream create(FileSystemContext fileSystemContext, BlockInfo blockInfo, WorkerNetAddress workerNetAddress, BlockInStreamSource blockInStreamSource, InStreamOptions inStreamOptions) throws IOException {
        long blockId = blockInfo.getBlockId();
        long length = blockInfo.getLength();
        if (blockInStreamSource == BlockInStreamSource.PROCESS_LOCAL) {
            LOG.debug("Creating worker process local input stream for block {} @ {}", Long.valueOf(blockId), workerNetAddress);
            return createProcessLocalBlockInStream(fileSystemContext, workerNetAddress, blockId, length, inStreamOptions);
        }
        AlluxioConfiguration clusterConf = fileSystemContext.getClusterConf();
        boolean z = clusterConf.getBoolean(PropertyKey.USER_SHORT_CIRCUIT_ENABLED);
        boolean z2 = clusterConf.getBoolean(PropertyKey.USER_SHORT_CIRCUIT_PREFERRED);
        boolean isDomainSocketSupported = NettyUtils.isDomainSocketSupported(workerNetAddress);
        boolean z3 = blockInStreamSource == BlockInStreamSource.NODE_LOCAL;
        if (z3 && z && (z2 || !isDomainSocketSupported)) {
            LOG.debug("Creating short circuit input stream for block {} @ {}", Long.valueOf(blockId), workerNetAddress);
            try {
                return createLocalBlockInStream(fileSystemContext, workerNetAddress, blockId, length, inStreamOptions);
            } catch (NotFoundException e) {
                LOG.warn("Failed to create short circuit input stream for block {} @ {}. Falling back to network transfer", Long.valueOf(blockId), workerNetAddress);
            }
        }
        LOG.debug("Creating gRPC input stream for block {} @ {} from client {} reading through {} (data locates in the local worker {}, shortCircuitEnabled {}, shortCircuitPreferred {}, sourceSupportDomainSocket {})", new Object[]{Long.valueOf(blockId), workerNetAddress, NetworkAddressUtils.getClientHostName(clusterConf), workerNetAddress, Boolean.valueOf(z3), Boolean.valueOf(z), Boolean.valueOf(z2), Boolean.valueOf(isDomainSocketSupported)});
        return createGrpcBlockInStream(fileSystemContext, workerNetAddress, blockInStreamSource, blockId, length, inStreamOptions);
    }

    private static BlockInStream createProcessLocalBlockInStream(FileSystemContext fileSystemContext, WorkerNetAddress workerNetAddress, long j, long j2, InStreamOptions inStreamOptions) {
        return new BlockInStream(new BlockWorkerDataReader.Factory(fileSystemContext.getProcessLocalWorker().orElseThrow(NullPointerException::new), j, fileSystemContext.getClusterConf().getBytes(PropertyKey.USER_LOCAL_READER_CHUNK_SIZE_BYTES), inStreamOptions), workerNetAddress, BlockInStreamSource.PROCESS_LOCAL, j, j2);
    }

    private static BlockInStream createLocalBlockInStream(FileSystemContext fileSystemContext, WorkerNetAddress workerNetAddress, long j, long j2, InStreamOptions inStreamOptions) throws IOException {
        return new BlockInStream(new LocalFileDataReader.Factory(fileSystemContext, workerNetAddress, j, fileSystemContext.getClusterConf().getBytes(PropertyKey.USER_LOCAL_READER_CHUNK_SIZE_BYTES), inStreamOptions), workerNetAddress, BlockInStreamSource.NODE_LOCAL, j, j2);
    }

    private static BlockInStream createGrpcBlockInStream(FileSystemContext fileSystemContext, WorkerNetAddress workerNetAddress, BlockInStreamSource blockInStreamSource, long j, long j2, InStreamOptions inStreamOptions) {
        long bytes = fileSystemContext.getClusterConf().getBytes(PropertyKey.USER_STREAMING_READER_CHUNK_SIZE_BYTES);
        ReadRequest.Builder chunkSize = ReadRequest.newBuilder().setBlockId(j).setPromote(ReadType.fromProto(inStreamOptions.getOptions().getReadType()).isPromote()).setOpenUfsBlockOptions(inStreamOptions.getOpenUfsBlockOptions(j)).setPositionShort(inStreamOptions.getPositionShort()).setChunkSize(bytes);
        return new BlockInStream((!fileSystemContext.getClusterConf().getBoolean(PropertyKey.FUSE_SHARED_CACHING_READER_ENABLED) || j2 <= bytes * 4) ? new GrpcDataReader.Factory(fileSystemContext, workerNetAddress, chunkSize) : new SharedGrpcDataReader.Factory(fileSystemContext, workerNetAddress, chunkSize, j2), workerNetAddress, blockInStreamSource, j, j2);
    }

    public static BlockInStream createRemoteBlockInStream(FileSystemContext fileSystemContext, long j, WorkerNetAddress workerNetAddress, BlockInStreamSource blockInStreamSource, long j2, Protocol.OpenUfsBlockOptions openUfsBlockOptions) {
        return new BlockInStream(new GrpcDataReader.Factory(fileSystemContext, workerNetAddress, ReadRequest.newBuilder().setBlockId(j).setOpenUfsBlockOptions(openUfsBlockOptions).setChunkSize(fileSystemContext.getClusterConf().getBytes(PropertyKey.USER_STREAMING_READER_CHUNK_SIZE_BYTES)).buildPartial().toBuilder()), workerNetAddress, blockInStreamSource, j, j2);
    }

    @VisibleForTesting
    protected BlockInStream(DataReader.Factory factory, WorkerNetAddress workerNetAddress, BlockInStreamSource blockInStreamSource, long j, long j2) {
        this.mDataReaderFactory = factory;
        this.mAddress = workerNetAddress;
        this.mInStreamSource = blockInStreamSource;
        this.mId = j;
        this.mLength = j2;
    }

    @Override // alluxio.Positioned
    public long getPos() {
        return this.mPos;
    }

    @Override // java.io.InputStream
    public int read() throws IOException {
        int read = read(this.mSingleByte);
        if (read == -1) {
            return -1;
        }
        Preconditions.checkState(read == 1);
        return BufferUtils.byteToInt(this.mSingleByte[0]);
    }

    @Override // java.io.InputStream
    public int read(byte[] bArr) throws IOException {
        return read(bArr, 0, bArr.length);
    }

    @Override // java.io.InputStream
    public int read(byte[] bArr, int i, int i2) throws IOException {
        Objects.requireNonNull(bArr, "Read buffer cannot be null");
        return read(ByteBuffer.wrap(bArr), i, i2);
    }

    public int read(ByteBuffer byteBuffer, int i, int i2) throws IOException {
        Preconditions.checkArgument(i >= 0 && i2 >= 0 && i2 + i <= byteBuffer.capacity(), PreconditionMessage.ERR_BUFFER_STATE.toString(), Integer.valueOf(byteBuffer.capacity()), Integer.valueOf(i), Integer.valueOf(i2));
        checkIfClosed();
        if (i2 == 0) {
            return 0;
        }
        if (this.mPos == this.mLength) {
            return -1;
        }
        readChunk();
        if (this.mCurrentChunk == null) {
            this.mEOF = true;
        }
        if (this.mEOF) {
            closeDataReader();
            if (this.mPos < this.mLength) {
                throw new OutOfRangeException(String.format("Block %s is expected to be %s bytes, but only %s bytes are available in the UFS. Please retry the read and on the next access, Alluxio will sync with the UFS and fetch the updated file content.", Long.valueOf(this.mId), Long.valueOf(this.mLength), Long.valueOf(this.mPos)));
            }
            return -1;
        }
        int min = Math.min(i2, this.mCurrentChunk.readableBytes());
        byteBuffer.position(i).limit(i + min);
        this.mCurrentChunk.readBytes(byteBuffer);
        this.mPos += min;
        if (this.mPos == this.mLength) {
            closeDataReader();
        }
        return min;
    }

    @Override // alluxio.client.PositionedReadable
    public int positionedRead(long j, byte[] bArr, int i, int i2) throws IOException {
        if (i2 == 0) {
            return 0;
        }
        if (j < 0 || j >= this.mLength) {
            return -1;
        }
        DataReader create = this.mDataReaderFactory.create(j, i2);
        Throwable th = null;
        while (true) {
            if (i2 <= 0) {
                break;
            }
            DataBuffer dataBuffer = null;
            try {
                try {
                    dataBuffer = create.readChunk();
                    if (dataBuffer != null) {
                        Preconditions.checkState(dataBuffer.readableBytes() <= i2);
                        int readableBytes = dataBuffer.readableBytes();
                        dataBuffer.readBytes(bArr, i, readableBytes);
                        i2 -= readableBytes;
                        i += readableBytes;
                        if (dataBuffer != null) {
                            dataBuffer.release();
                        }
                    } else if (dataBuffer != null) {
                        dataBuffer.release();
                    }
                } finally {
                    if (create != null) {
                        if (0 != 0) {
                            try {
                                create.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            create.close();
                        }
                    }
                }
            } catch (Throwable th3) {
                if (dataBuffer != null) {
                    dataBuffer.release();
                }
                throw th3;
            }
        }
        if (i2 == i2) {
            return -1;
        }
        return i2 - i2;
    }

    @Override // alluxio.client.BoundedStream
    public long remaining() {
        if (this.mEOF) {
            return 0L;
        }
        return this.mLength - this.mPos;
    }

    @Override // alluxio.Seekable
    public void seek(long j) throws IOException {
        checkIfClosed();
        Preconditions.checkArgument(j >= 0, PreconditionMessage.ERR_SEEK_NEGATIVE.toString(), j);
        Preconditions.checkArgument(j <= this.mLength, "Seek position past the end of the read region (block or file).", this.mId);
        if (j == this.mPos) {
            return;
        }
        if (this.mDataReader instanceof SharedGrpcDataReader) {
            seekForSharedGrpcDataReader(j);
            return;
        }
        if (j < this.mPos) {
            this.mEOF = false;
        }
        closeDataReader();
        this.mPos = j;
    }

    private void seekForSharedGrpcDataReader(long j) throws IOException {
        if (j < this.mPos) {
            this.mEOF = false;
            ((SharedGrpcDataReader) this.mDataReader).seek(j);
            if (this.mCurrentChunk != null) {
                this.mCurrentChunk.release();
                this.mCurrentChunk = null;
            }
        } else {
            long j2 = this.mPos;
            while (this.mCurrentChunk != null && j2 < j) {
                long readableBytes = j2 + this.mCurrentChunk.readableBytes();
                if (readableBytes <= j) {
                    j2 = readableBytes;
                    this.mCurrentChunk.release();
                    this.mCurrentChunk = this.mDataReader.readChunk();
                } else {
                    int i = (int) (j - j2);
                    this.mCurrentChunk.readBytes(new byte[i], 0, i);
                    j2 = j;
                }
            }
            if (j2 < j) {
                closeDataReader();
            }
        }
        this.mPos = j;
    }

    @Override // java.io.InputStream
    public long skip(long j) throws IOException {
        checkIfClosed();
        if (j <= 0) {
            return 0L;
        }
        long min = Math.min(remaining(), j);
        this.mPos += min;
        closeDataReader();
        return min;
    }

    @Override // java.io.InputStream, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.mClosed) {
            return;
        }
        try {
            closeDataReader();
            this.mClosed = true;
        } finally {
            this.mDataReaderFactory.close();
        }
    }

    @VisibleForTesting
    public DataReader.Factory getDataReaderFactory() {
        return this.mDataReaderFactory;
    }

    private void readChunk() throws IOException {
        if (this.mDataReader == null) {
            this.mDataReader = this.mDataReaderFactory.create(this.mPos, this.mLength - this.mPos);
        }
        if (this.mCurrentChunk != null && this.mCurrentChunk.readableBytes() == 0) {
            this.mCurrentChunk.release();
            this.mCurrentChunk = null;
        }
        if (this.mCurrentChunk == null) {
            this.mCurrentChunk = this.mDataReader.readChunk();
        }
    }

    private void closeDataReader() throws IOException {
        if (this.mCurrentChunk != null) {
            this.mCurrentChunk.release();
            this.mCurrentChunk = null;
        }
        if (this.mDataReader != null) {
            this.mDataReader.close();
        }
        this.mDataReader = null;
    }

    @Override // alluxio.client.CanUnbuffer
    public void unbuffer() {
        try {
            closeDataReader();
        } catch (IOException e) {
            LogUtils.warnWithException(LOG, "failed to unbuffer the block stream", e);
        }
    }

    private void checkIfClosed() {
        Preconditions.checkState(!this.mClosed, "Cannot do operations on a closed BlockInStream");
    }

    public WorkerNetAddress getAddress() {
        return this.mAddress;
    }

    public BlockInStreamSource getSource() {
        return this.mInStreamSource;
    }

    public long getId() {
        return this.mId;
    }
}
