package com.qubole.rubix.bookkeeper;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import com.google.shaded.shaded.common.annotations.VisibleForTesting;
import com.google.shaded.shaded.common.base.Preconditions;
import com.google.shaded.shaded.common.collect.BoundType;
import com.google.shaded.shaded.common.collect.Range;
import com.google.shaded.shaded.common.util.concurrent.AbstractScheduledService;
import com.qubole.rubix.common.metrics.BookKeeperMetrics;
import com.qubole.rubix.core.CachingFileSystemStatsProvider;
import com.qubole.rubix.spi.CacheConfig;
import java.io.IOException;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;

/* loaded from: input_file:com/qubole/rubix/bookkeeper/RemoteFetchProcessor.class */
public class RemoteFetchProcessor extends AbstractScheduledService {
    private Queue<FetchRequest> processQueue;
    private FileDownloader downloader;
    private MetricRegistry metrics;
    private Counter totalDownloadRequests;
    private Counter processedRequests;
    private final BookKeeper bookKeeper;
    int processThreadInitalDelay;
    int processThreadInterval;
    long requestProcessDelay;
    private static final Log log = LogFactory.getLog(RemoteFetchProcessor.class);

    public RemoteFetchProcessor(BookKeeper bookKeeper, MetricRegistry metricRegistry, Configuration configuration, CachingFileSystemStatsProvider cachingFileSystemStatsProvider) {
        Configuration configuration2 = new Configuration(configuration);
        CacheConfig.setCacheDataEnabled(configuration2, false);
        this.processQueue = new ConcurrentLinkedQueue();
        this.metrics = metricRegistry;
        this.downloader = new FileDownloader(bookKeeper, metricRegistry, configuration2, this, cachingFileSystemStatsProvider);
        this.bookKeeper = bookKeeper;
        this.processThreadInitalDelay = CacheConfig.getProcessThreadInitialDelay(configuration2);
        this.processThreadInterval = CacheConfig.getProcessThreadInterval(configuration2);
        this.requestProcessDelay = CacheConfig.getRemoteFetchProcessInterval(configuration2);
        initializeMetrics();
    }

    @VisibleForTesting
    public Queue<FetchRequest> getProcessQueue() {
        return this.processQueue;
    }

    FileDownloader getFileDownloaderInstance() {
        return this.downloader;
    }

    private void initializeMetrics() {
        this.totalDownloadRequests = this.metrics.counter(BookKeeperMetrics.CacheMetric.TOTAL_ASYNC_REQUEST_COUNT.getMetricName());
        this.processedRequests = this.metrics.counter(BookKeeperMetrics.CacheMetric.PROCESSED_ASYNC_REQUEST_COUNT.getMetricName());
        this.metrics.register(BookKeeperMetrics.CacheMetric.ASYNC_QUEUE_SIZE_GAUGE.getMetricName(), new Gauge<Integer>() { // from class: com.qubole.rubix.bookkeeper.RemoteFetchProcessor.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.codahale.metrics.Gauge
            public Integer getValue() {
                return Integer.valueOf(RemoteFetchProcessor.this.processQueue.size());
            }
        });
    }

    public void addToProcessQueue(String str, long j, int i, long j2, long j3) {
        this.processQueue.add(new FetchRequest(str, j, i, j2, j3, System.currentTimeMillis()));
        this.totalDownloadRequests.inc();
    }

    public void addToProcessQueueSafe(String str, Set<Range<Long>> set, long j, long j2) {
        try {
            addToProcessQueue(str, set, j, j2);
        } catch (Exception e) {
            log.warn("Unable to queue ranges for file: " + str);
        }
    }

    private void addToProcessQueue(String str, Set<Range<Long>> set, long j, long j2) {
        set.stream().forEach(range -> {
            Preconditions.checkState(range.lowerBoundType() == BoundType.CLOSED && range.upperBoundType() == BoundType.OPEN, "Unexpected range type encountered lower=%s and upper=%s", range.lowerBoundType(), range.upperBoundType());
            long longValue = ((Long) range.lowerEndpoint()).longValue();
            long longValue2 = ((Long) range.upperEndpoint()).longValue() - ((Long) range.lowerEndpoint()).longValue();
            while (longValue2 > 0) {
                int intExact = Math.toIntExact(Math.min(longValue2, 2147483647L));
                addToProcessQueue(str, longValue, intExact, j, j2);
                longValue2 -= intExact;
                longValue += intExact;
            }
        });
    }

    @Override // com.google.shaded.shaded.common.util.concurrent.AbstractScheduledService
    protected void runOneIteration() throws Exception {
        try {
            long currentTimeMillis = System.currentTimeMillis();
            if (!this.processQueue.isEmpty() && this.bookKeeper.isInitialized()) {
                processRequest(currentTimeMillis);
            }
        } catch (Exception e) {
            log.error("Could not process download requests", e);
        }
    }

    protected void processRequest(long j) throws IOException, InterruptedException, ExecutionException {
        ConcurrentMap<String, DownloadRequestContext> mergeRequests = mergeRequests(j);
        this.downloader.processDownloadRequests(this.downloader.getFileDownloadRequestChains(mergeRequests));
        mergeRequests.clear();
    }

    protected ConcurrentMap<String, DownloadRequestContext> mergeRequests(long j) {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        while (!this.processQueue.isEmpty()) {
            FetchRequest peek = this.processQueue.peek();
            if (j - peek.getRequestedTime() < this.requestProcessDelay) {
                break;
            }
            DownloadRequestContext downloadRequestContext = new DownloadRequestContext(peek.getRemotePath(), peek.getFileSize(), peek.getLastModified());
            if (!concurrentHashMap.containsKey(peek.getRemotePath())) {
                concurrentHashMap.putIfAbsent(peek.getRemotePath(), downloadRequestContext);
            } else if (((DownloadRequestContext) concurrentHashMap.get(peek.getRemotePath())).getLastModifiedTime() < peek.getLastModified()) {
                concurrentHashMap.remove(peek.getRemotePath());
                concurrentHashMap.putIfAbsent(peek.getRemotePath(), downloadRequestContext);
            } else if (((DownloadRequestContext) concurrentHashMap.get(peek.getRemotePath())).getLastModifiedTime() > peek.getLastModified()) {
                this.processQueue.remove();
            }
            ((DownloadRequestContext) concurrentHashMap.get(peek.getRemotePath())).addDownloadRange(peek.getOffset(), peek.getOffset() + peek.getLength());
            this.processQueue.remove();
            this.processedRequests.inc();
        }
        return concurrentHashMap;
    }

    @Override // com.google.shaded.shaded.common.util.concurrent.AbstractScheduledService
    protected AbstractScheduledService.Scheduler scheduler() {
        return AbstractScheduledService.Scheduler.newFixedDelaySchedule(this.processThreadInitalDelay, this.processThreadInterval, TimeUnit.MILLISECONDS);
    }
}
