package com.qubole.rubix.bookkeeper;

import com.codahale.metrics.Counter;
import com.codahale.metrics.MetricRegistry;
import com.google.shaded.shaded.common.collect.Range;
import com.google.shaded.shaded.common.util.concurrent.MoreExecutors;
import com.google.shaded.shaded.common.util.concurrent.ThreadFactoryBuilder;
import com.qubole.rubix.common.metrics.BookKeeperMetrics;
import com.qubole.rubix.core.CachingFileSystemStatsProvider;
import com.qubole.rubix.core.ReadRequest;
import com.qubole.rubix.spi.CacheConfig;
import com.qubole.rubix.spi.CacheUtil;
import com.qubole.rubix.spi.CommonUtilities;
import com.qubole.rubix.spi.thrift.BlockLocation;
import com.qubole.rubix.spi.thrift.CacheStatusRequest;
import com.qubole.rubix.spi.thrift.CacheStatusResponse;
import com.qubole.rubix.spi.thrift.Location;
import com.qubole.rubix.spi.utils.DataSizeUnits;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.DirectBufferPool;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/qubole/rubix/bookkeeper/FileDownloader.class */
public class FileDownloader {
    Configuration conf;
    private ExecutorService processService;
    int diskReadBufferSize;
    private MetricRegistry metrics;
    private Counter totalMBDownloaded;
    private Counter totalTimeToDownload;
    private final RemoteFetchProcessor remoteFetchProcessor;
    private final BookKeeper bookKeeper;
    private final CachingFileSystemStatsProvider stats;
    private static final Log log = LogFactory.getLog(FileDownloader.class);
    private static DirectBufferPool bufferPool = new DirectBufferPool();

    public FileDownloader(BookKeeper bookKeeper, MetricRegistry metricRegistry, Configuration configuration, RemoteFetchProcessor remoteFetchProcessor, CachingFileSystemStatsProvider cachingFileSystemStatsProvider) {
        this.bookKeeper = bookKeeper;
        this.remoteFetchProcessor = remoteFetchProcessor;
        this.conf = configuration;
        this.metrics = metricRegistry;
        int remoteFetchThreads = CacheConfig.getRemoteFetchThreads(configuration);
        this.diskReadBufferSize = CacheConfig.getDiskReadBufferSize(configuration);
        this.processService = MoreExecutors.getExitingExecutorService((ThreadPoolExecutor) Executors.newFixedThreadPool(remoteFetchThreads, new ThreadFactoryBuilder().setNameFormat("parallel-warmup-%s").setDaemon(true).build()));
        this.stats = cachingFileSystemStatsProvider;
        initializeMetrics();
    }

    private void initializeMetrics() {
        this.totalMBDownloaded = this.metrics.counter(BookKeeperMetrics.CacheMetric.ASYNC_DOWNLOADED_MB_COUNT.getMetricName());
        this.totalTimeToDownload = this.metrics.counter(BookKeeperMetrics.CacheMetric.ASYNC_DOWNLOAD_TIME_COUNT.getMetricName());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<FileDownloadRequestChain> getFileDownloadRequestChains(ConcurrentMap<String, DownloadRequestContext> concurrentMap) throws IOException {
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<String, DownloadRequestContext> entry : concurrentMap.entrySet()) {
            Path path = new Path(entry.getKey());
            DownloadRequestContext value = entry.getValue();
            int i = 0;
            FileSystem fileSystem = FileSystem.get(path.toUri(), this.conf);
            fileSystem.initialize(path.toUri(), this.conf);
            ByteBuffer buffer = bufferPool.getBuffer(this.diskReadBufferSize);
            FileDownloadRequestChain fileDownloadRequestChain = null;
            String key = entry.getKey();
            Range range = null;
            Iterator<Range<Long>> it = value.getRanges().asRanges().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                Range<Long> next = it.next();
                long startBlock = CommonUtilities.toStartBlock(next.lowerEndpoint().longValue(), this.conf);
                long endBlock = CommonUtilities.toEndBlock(next.upperEndpoint().longValue(), this.conf);
                Range closedOpen = Range.closedOpen(Long.valueOf(startBlock), Long.valueOf(endBlock));
                if (range == null || !range.encloses(closedOpen)) {
                    range = closedOpen;
                    try {
                        CacheStatusResponse cacheStatus = this.bookKeeper.getCacheStatus(new CacheStatusRequest(value.getRemoteFilePath(), value.getFileSize(), value.getLastModifiedTime(), startBlock, endBlock));
                        List<BlockLocation> blocks = cacheStatus.getBlocks();
                        if (i != 0 && cacheStatus.getGenerationNumber() != i) {
                            log.debug(String.format("Mismatch in generation-number in download requests for file %s, expected=%d but found=%d, skipping the file", key, Integer.valueOf(i), Integer.valueOf(cacheStatus.getGenerationNumber())));
                            fileDownloadRequestChain = null;
                            break;
                        }
                        i = cacheStatus.getGenerationNumber();
                        if (fileDownloadRequestChain == null) {
                            String localPath = CacheUtil.getLocalPath(key, this.conf, i);
                            log.debug("Processing Request for File : " + path.toString() + " LocalFile : " + localPath);
                            fileDownloadRequestChain = new FileDownloadRequestChain(this.bookKeeper, fileSystem, localPath, buffer, this.conf, value.getRemoteFilePath(), value.getFileSize(), value.getLastModifiedTime(), i);
                        }
                        for (int i2 = 0; i2 < blocks.size(); i2++) {
                            if (blocks.get(i2).getLocation().equals(Location.LOCAL)) {
                                long j = startBlock + i2;
                                long blockStartPosition = CommonUtilities.toBlockStartPosition(j, this.conf);
                                long min = Math.min(CommonUtilities.toBlockStartPosition(j + 1, this.conf), value.getFileSize());
                                fileDownloadRequestChain.addReadRequest(new ReadRequest(blockStartPosition, min, blockStartPosition, min, null, 0, value.getFileSize()));
                            }
                        }
                    } catch (Exception e) {
                        log.warn("Error communicating with bookKeeper", e);
                        this.remoteFetchProcessor.addToProcessQueueSafe(value.getRemoteFilePath(), value.getRanges().asRanges(), value.getFileSize(), value.getLastModifiedTime());
                        fileDownloadRequestChain = null;
                    }
                }
            }
            if (fileDownloadRequestChain != null) {
                log.debug("Request added for file: " + fileDownloadRequestChain.getRemotePath() + " Number of Requests : " + fileDownloadRequestChain.getReadRequests().size());
                arrayList.add(fileDownloadRequestChain);
            }
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public long processDownloadRequests(List<FileDownloadRequestChain> list) {
        if (list.size() == 0) {
            return 0L;
        }
        long j = 0;
        ArrayList<Future> arrayList = new ArrayList();
        for (FileDownloadRequestChain fileDownloadRequestChain : list) {
            fileDownloadRequestChain.lock();
            arrayList.add(this.processService.submit(fileDownloadRequestChain));
        }
        for (Future future : arrayList) {
            FileDownloadRequestChain fileDownloadRequestChain2 = list.get(arrayList.indexOf(future));
            long j2 = 0;
            Iterator<ReadRequest> it = fileDownloadRequestChain2.getReadRequests().iterator();
            while (it.hasNext()) {
                j2 += it.next().getBackendReadLength();
            }
            try {
                long longValue = ((Long) future.get()).longValue();
                if (longValue == j2) {
                    this.stats.addReadRequestChainStats(fileDownloadRequestChain2.getStats());
                    j += longValue;
                    this.totalTimeToDownload.inc(fileDownloadRequestChain2.getTimeSpentOnDownload());
                } else {
                    log.error("ReadData didn't match with requested value. RequestedData: " + j2 + " ReadData: " + longValue);
                }
            } catch (InterruptedException | ExecutionException e) {
                if (e.getCause() instanceof FileNotFoundException) {
                    log.debug("Could not process download request: " + e.getCause().getMessage());
                } else {
                    log.error("Could not process download request", e);
                }
                fileDownloadRequestChain2.cancel();
            }
        }
        this.totalMBDownloaded.inc(DataSizeUnits.BYTES.toMB(j));
        return j;
    }
}
