package com.qubole.rubix.bookkeeper;

import com.google.shaded.shaded.common.base.Preconditions;
import com.qubole.rubix.core.ReadRequest;
import com.qubole.rubix.core.ReadRequestChain;
import com.qubole.rubix.core.ReadRequestChainStats;
import com.qubole.rubix.core.RemoteReadRequestChain;
import com.qubole.rubix.spi.CacheConfig;
import com.qubole.rubix.spi.CommonUtilities;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

/* loaded from: input_file:com/qubole/rubix/bookkeeper/FileDownloadRequestChain.class */
public class FileDownloadRequestChain extends ReadRequestChain {
    private BookKeeper bookKeeper;
    private FileSystem remoteFileSystem;
    private String localFile;
    private String remotePath;
    private long fileSize;
    private long lastModified;
    private long totalRequestedRead;
    private final int maxRemoteReadBufferSize;
    Configuration conf;
    ByteBuffer directBuffer;
    private long timeSpentOnDownload;
    private int blockSize;
    private static final Log log = LogFactory.getLog(FileDownloadRequestChain.class);

    public FileDownloadRequestChain(BookKeeper bookKeeper, FileSystem fileSystem, String str, ByteBuffer byteBuffer, Configuration configuration, String str2, long j, long j2, int i) {
        super(i, getBlockAlignedMaxChunkSize(configuration));
        this.bookKeeper = bookKeeper;
        this.remoteFileSystem = fileSystem;
        this.localFile = str;
        this.conf = configuration;
        this.remotePath = str2;
        this.fileSize = j;
        this.lastModified = j2;
        this.directBuffer = byteBuffer;
        this.maxRemoteReadBufferSize = CacheConfig.getDataTransferBufferSize(configuration);
        this.blockSize = CacheConfig.getBlockSize(configuration);
    }

    private static long getBlockAlignedMaxChunkSize(Configuration configuration) {
        long parallelWarmupMaxChunkSize = CacheConfig.getParallelWarmupMaxChunkSize(configuration);
        long blockSize = CacheConfig.getBlockSize(configuration);
        return (parallelWarmupMaxChunkSize / blockSize) * blockSize;
    }

    public String getRemotePath() {
        return this.remotePath;
    }

    public long getFileSize() {
        return this.fileSize;
    }

    public long getLastModified() {
        return this.lastModified;
    }

    public long getTimeSpentOnDownload() {
        return this.timeSpentOnDownload;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.concurrent.Callable
    public Long call() throws IOException {
        Preconditions.checkState(isLocked(), "Trying to execute Chain without locking");
        List<ReadRequest> readRequests = getReadRequests();
        if (readRequests.size() == 0) {
            return 0L;
        }
        long currentTimeMillis = System.currentTimeMillis();
        File file = new File(this.localFile);
        if (!file.exists()) {
            throw new FileNotFoundException(String.format("File does not exists %s", this.localFile));
        }
        byte[] bArr = new byte[Math.min(this.maxRemoteReadBufferSize, Math.toIntExact(Math.min(2147483647L, ((Long) readRequests.stream().map(readRequest -> {
            return Long.valueOf(readRequest.getActualReadLength());
        }).max((v0, v1) -> {
            return v0.compareTo(v1);
        }).get()).longValue())))];
        FSDataInputStream fSDataInputStream = null;
        FileChannel fileChannel = null;
        try {
            fSDataInputStream = this.remoteFileSystem.open(new Path(this.remotePath));
            fileChannel = new FileOutputStream(new RandomAccessFile(file, "rw").getFD()).getChannel();
            for (ReadRequest readRequest2 : readRequests) {
                if (isCancelled()) {
                    log.debug("Request Cancelled for " + readRequest2.getBackendReadStart());
                    propagateCancel(getClass().getName());
                }
                this.totalRequestedRead += copyIntoCache(fSDataInputStream, fileChannel, readRequest2.getBackendReadStart(), readRequest2.getBackendReadLength(), bArr);
                updateCacheStatus(readRequest2);
            }
            this.timeSpentOnDownload = (System.currentTimeMillis() - currentTimeMillis) / 1000;
            log.debug("Downloaded " + this.totalRequestedRead + " bytes of file " + this.remotePath);
            log.debug("RemoteFetchRequest took : " + this.timeSpentOnDownload + " secs ");
            Long valueOf = Long.valueOf(this.totalRequestedRead);
            if (fileChannel != null) {
                fileChannel.close();
            }
            if (fSDataInputStream != null) {
                fSDataInputStream.close();
            }
            return valueOf;
        } catch (Throwable th) {
            if (fileChannel != null) {
                fileChannel.close();
            }
            if (fSDataInputStream != null) {
                fSDataInputStream.close();
            }
            throw th;
        }
    }

    private long copyIntoCache(FSDataInputStream fSDataInputStream, FileChannel fileChannel, long j, long j2, byte[] bArr) throws IOException {
        log.debug(String.format("Copying data of file %s of length %d from position %d", this.remotePath, Long.valueOf(j2), Long.valueOf(j)));
        if (j2 > bArr.length) {
            long j3 = j2;
            while (true) {
                long j4 = j3;
                if (j4 <= 0) {
                    break;
                }
                int intExact = Math.toIntExact(Math.min(bArr.length, j4));
                fSDataInputStream.seek(j);
                RemoteReadRequestChain.readIntoBuffer(bArr, 0, intExact, fSDataInputStream);
                writeToFile(bArr, intExact, fileChannel, j);
                j += intExact;
                j3 = j4 - intExact;
            }
        } else {
            fSDataInputStream.readFully(j, bArr, 0, Math.toIntExact(j2));
            writeToFile(bArr, Math.toIntExact(j2), fileChannel, j);
        }
        log.debug(String.format("Copied %d to file %s from position %d", Long.valueOf(j2), this.remotePath, Long.valueOf(j)));
        return j2;
    }

    private void writeToFile(byte[] bArr, int i, FileChannel fileChannel, long j) throws IOException {
        int i2 = i;
        int i3 = 0;
        while (i2 > 0) {
            int min = Math.min(i2, this.directBuffer.capacity());
            this.directBuffer.clear();
            this.directBuffer.put(bArr, i3, min);
            this.directBuffer.flip();
            int write = fileChannel.write(this.directBuffer, j + i3);
            this.directBuffer.compact();
            i3 += write;
            i2 -= write;
        }
    }

    private void updateCacheStatus(ReadRequest readRequest) {
        long startBlock = CommonUtilities.toStartBlock(readRequest.getBackendReadStart(), this.blockSize);
        long endBlock = CommonUtilities.toEndBlock(readRequest.getBackendReadEnd(), this.blockSize);
        try {
            this.bookKeeper.setAllCached(this.remotePath, this.fileSize, this.lastModified, startBlock, endBlock, this.generationNumber);
        } catch (Exception e) {
            log.warn(String.format("Unable to update cache status for %s:%d:%d, this can cause wrong accounting of disk utilization", this.remotePath, Long.valueOf(startBlock), Long.valueOf(endBlock)), e);
        }
    }

    @Override // com.qubole.rubix.core.ReadRequestChain
    public ReadRequestChainStats getStats() {
        return new ReadRequestChainStats().setRemoteRRCDataRead(this.totalRequestedRead).setRemoteRRCWarmupTime(this.timeSpentOnDownload);
    }
}
