package com.qubole.rubix.core;

import com.google.shaded.shaded.common.annotations.VisibleForTesting;
import com.google.shaded.shaded.common.base.Preconditions;
import com.qubole.rubix.spi.BookKeeperFactory;
import com.qubole.rubix.spi.CacheConfig;
import com.qubole.rubix.spi.CacheUtil;
import com.qubole.rubix.spi.RetryingPooledBookkeeperClient;
import java.io.EOFException;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.util.DirectBufferPool;

/* loaded from: input_file:com/qubole/rubix/core/RemoteReadRequestChain.class */
public class RemoteReadRequestChain extends ReadRequestChain {
    final FSDataInputStream inputStream;
    private DirectBufferPool bufferPool;
    private int directBufferSize;
    private byte[] affixBuffer;
    private long extraRead;
    private long totalRequestedRead;
    private long warmupPenalty;
    private int blockSize;
    private BookKeeperFactory bookKeeperFactory;
    private static final Log log = LogFactory.getLog(RemoteReadRequestChain.class);
    private String localFile;

    public RemoteReadRequestChain(FSDataInputStream fSDataInputStream, String str, int i, DirectBufferPool directBufferPool, Configuration configuration, byte[] bArr, BookKeeperFactory bookKeeperFactory) {
        super(i);
        this.inputStream = fSDataInputStream;
        this.bufferPool = directBufferPool;
        this.directBufferSize = CacheConfig.getDiskReadBufferSize(configuration);
        this.affixBuffer = bArr;
        this.blockSize = bArr.length;
        this.localFile = CacheUtil.getLocalPath(str, configuration, i);
        this.bookKeeperFactory = bookKeeperFactory;
    }

    @VisibleForTesting
    public RemoteReadRequestChain(FSDataInputStream fSDataInputStream, String str, int i, Configuration configuration) {
        this(fSDataInputStream, str, i, new DirectBufferPool(), configuration, new byte[100], new BookKeeperFactory());
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.concurrent.Callable
    public Long call() throws IOException {
        log.debug(String.format("Read Request threadName: %s, Remote read Executor threadName: %s", this.threadName, Thread.currentThread().getName()));
        Thread.currentThread().setName(this.threadName);
        Preconditions.checkState(this.isLocked, "Trying to execute Chain without locking");
        if (this.readRequests.size() == 0) {
            return 0L;
        }
        File file = new File(this.localFile);
        if (!file.exists()) {
            throw new IOException(String.format("File does not exists %s", this.localFile));
        }
        FileChannel channel = new FileOutputStream(new RandomAccessFile(file, "rw").getFD()).getChannel();
        ByteBuffer buffer = this.bufferPool.getBuffer(this.directBufferSize);
        Preconditions.checkState(buffer != null, "directBuffer could not be allocated");
        try {
            for (ReadRequest readRequest : this.readRequests) {
                if (this.cancelled) {
                    propagateCancel(getClass().getName());
                }
                log.debug(String.format("Executing ReadRequest: [%d, %d, %d, %d, %d]", Long.valueOf(readRequest.getBackendReadStart()), Long.valueOf(readRequest.getBackendReadEnd()), Long.valueOf(readRequest.getActualReadStart()), Long.valueOf(readRequest.getActualReadEnd()), Integer.valueOf(readRequest.getDestBufferOffset())));
                int actualReadStart = (int) (readRequest.getActualReadStart() - readRequest.getBackendReadStart());
                int backendReadEnd = (int) (readRequest.getBackendReadEnd() - readRequest.getActualReadEnd());
                log.debug(String.format("PrefixLength: %d SuffixLength: %d", Integer.valueOf(actualReadStart), Integer.valueOf(backendReadEnd)));
                if (actualReadStart > 0) {
                    this.inputStream.seek(readRequest.backendReadStart);
                    log.debug(String.format("Trying to Read %d bytes into prefix buffer", Integer.valueOf(actualReadStart)));
                    this.extraRead += readIntoBuffer(this.affixBuffer, 0, actualReadStart, this.inputStream);
                    log.debug(String.format("Copied %d prefix bytes into cache", Integer.valueOf(copyIntoCache(channel, buffer, this.affixBuffer, 0, actualReadStart, readRequest.backendReadStart))));
                }
                log.debug(String.format("Trying to Read %d bytes into destination buffer", Integer.valueOf(readRequest.getActualReadLengthIntUnsafe())));
                if (actualReadStart > 0 || backendReadEnd > 0) {
                    this.inputStream.seek(readRequest.actualReadStart);
                    readIntoBuffer(readRequest.getDestBuffer(), readRequest.destBufferOffset, readRequest.getActualReadLengthIntUnsafe(), this.inputStream);
                } else {
                    this.inputStream.readFully(readRequest.actualReadStart, readRequest.getDestBuffer(), readRequest.destBufferOffset, readRequest.getActualReadLengthIntUnsafe());
                    readRequest.getActualReadLengthIntUnsafe();
                }
                log.debug(String.format("Copied %d requested bytes into cache", Integer.valueOf(copyIntoCache(channel, buffer, readRequest.destBuffer, readRequest.destBufferOffset, readRequest.getActualReadLengthIntUnsafe(), readRequest.actualReadStart))));
                this.totalRequestedRead += readRequest.getActualReadLengthIntUnsafe();
                if (backendReadEnd > 0) {
                    log.debug(String.format("Trying to Read %d bytes into suffix buffer", Integer.valueOf(backendReadEnd)));
                    this.inputStream.seek(readRequest.actualReadEnd);
                    this.extraRead += readIntoBuffer(this.affixBuffer, 0, backendReadEnd, this.inputStream);
                    log.debug(String.format("Copied %d suffix bytes into cache", Integer.valueOf(copyIntoCache(channel, buffer, this.affixBuffer, 0, backendReadEnd, readRequest.actualReadEnd))));
                }
            }
            log.debug(String.format("Read %d bytes from remote localFile, added %d to destination buffer", Long.valueOf(this.extraRead + this.totalRequestedRead), Long.valueOf(this.totalRequestedRead)));
            Long valueOf = Long.valueOf(this.totalRequestedRead);
            this.bufferPool.returnBuffer(buffer);
            channel.close();
            return valueOf;
        } catch (Throwable th) {
            this.bufferPool.returnBuffer(buffer);
            channel.close();
            throw th;
        }
    }

    public static int readIntoBuffer(byte[] bArr, int i, int i2, FSDataInputStream fSDataInputStream) throws IOException {
        int i3 = 0;
        while (true) {
            int i4 = i3;
            if (i4 >= i2) {
                return i4;
            }
            int read = fSDataInputStream.read(bArr, i + i4, i2 - i4);
            if (read < 0) {
                throw new EOFException("End of file reached before reading fully.");
            }
            i3 = i4 + read;
        }
    }

    private int copyIntoCache(FileChannel fileChannel, ByteBuffer byteBuffer, byte[] bArr, int i, int i2, long j) throws IOException {
        log.debug(String.format("Trying to copy [%d - %d] bytes into cache with offset %d into localFile %s", Long.valueOf(j), Long.valueOf(j + i2), Integer.valueOf(i), this.localFile));
        long nanoTime = System.nanoTime();
        int i3 = i2;
        int i4 = 0;
        while (i3 > 0) {
            int min = Math.min(i3, byteBuffer.capacity());
            byteBuffer.clear();
            byteBuffer.put(bArr, i + i4, min);
            byteBuffer.flip();
            int write = fileChannel.write(byteBuffer, j + i4);
            i4 += write;
            i3 -= write;
        }
        this.warmupPenalty += System.nanoTime() - nanoTime;
        return i4;
    }

    @Override // com.qubole.rubix.core.ReadRequestChain
    public ReadRequestChainStats getStats() {
        return new ReadRequestChainStats().setRemoteRRCDataRead(this.extraRead + this.totalRequestedRead).setRemoteRRCExtraDataRead(this.extraRead).setRemoteRRCWarmupTime(this.warmupPenalty).setRemoteRRCRequests(this.requests);
    }

    @Override // com.qubole.rubix.core.ReadRequestChain
    public void updateCacheStatus(String str, long j, long j2, int i, Configuration configuration) {
        try {
            RetryingPooledBookkeeperClient createBookKeeperClient = this.bookKeeperFactory.createBookKeeperClient(configuration);
            Throwable th = null;
            try {
                try {
                    for (ReadRequest readRequest : this.readRequests) {
                        log.debug("Setting cached from : " + toBlock(readRequest.getBackendReadStart()) + " block to : " + (toBlock(readRequest.getBackendReadEnd() - 1) + 1));
                        createBookKeeperClient.setAllCached(str, j, j2, toBlock(readRequest.getBackendReadStart()), toBlock(readRequest.getBackendReadEnd() - 1) + 1, this.generationNumber);
                    }
                    if (createBookKeeperClient != null) {
                        if (0 != 0) {
                            try {
                                createBookKeeperClient.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createBookKeeperClient.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Exception e) {
            log.warn("Could not update BookKeeper about newly cached blocks", e);
        }
    }

    private long toBlock(long j) {
        return j / this.blockSize;
    }
}
