package com.qubole.rubix.core;

import com.google.shaded.shaded.common.annotations.VisibleForTesting;
import com.google.shaded.shaded.common.base.Preconditions;
import com.qubole.rubix.common.metrics.CachingFileSystemMetrics;
import com.qubole.rubix.common.metrics.CustomMetricsReporterProvider;
import com.qubole.rubix.core.ReadRequestChain;
import com.qubole.rubix.spi.BookKeeperFactory;
import com.qubole.rubix.spi.CacheUtil;
import com.qubole.rubix.spi.RetryingPooledBookkeeperClient;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.DirectBufferPool;

/* loaded from: input_file:com/qubole/rubix/core/CachedReadRequestChain.class */
public class CachedReadRequestChain extends ReadRequestChain {
    private String remotePath;
    private long readFromCache;
    private FileSystem.Statistics statistics;
    private FileSystem remoteFileSystem;
    private DirectReadRequestChain directReadChain;
    private Configuration conf;
    private long directDataRead;
    private BookKeeperFactory factory;
    private DirectBufferPool bufferPool;
    private int directBufferSize;
    private int corruptedFileCount;
    private static final Log log = LogFactory.getLog(CachedReadRequestChain.class);

    public CachedReadRequestChain(FileSystem fileSystem, String str, DirectBufferPool directBufferPool, int i, FileSystem.Statistics statistics, Configuration configuration, BookKeeperFactory bookKeeperFactory, int i2) {
        super(i2);
        this.conf = configuration;
        this.remotePath = str;
        this.remoteFileSystem = fileSystem;
        this.bufferPool = directBufferPool;
        this.directBufferSize = i;
        this.statistics = statistics;
        this.factory = bookKeeperFactory;
    }

    @VisibleForTesting
    public CachedReadRequestChain(FileSystem fileSystem, String str, Configuration configuration, BookKeeperFactory bookKeeperFactory, int i) {
        this(fileSystem, str, new DirectBufferPool(), 100, null, configuration, bookKeeperFactory, i);
    }

    @VisibleForTesting
    public CachedReadRequestChain() {
        super(0);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.concurrent.Callable
    public Long call() throws IOException {
        log.debug(String.format("Read Request threadName: %s, Cached read Executor threadName: %s", this.threadName, Thread.currentThread().getName()));
        Thread.currentThread().setName(this.threadName);
        if (this.readRequests.size() == 0) {
            return 0L;
        }
        Preconditions.checkState(this.isLocked, "Trying to execute Chain without locking");
        RandomAccessFile randomAccessFile = null;
        FileInputStream fileInputStream = null;
        FileChannel fileChannel = null;
        String localPath = CacheUtil.getLocalPath(this.remotePath, this.conf, this.generationNumber);
        ByteBuffer buffer = this.bufferPool.getBuffer(this.directBufferSize);
        try {
            try {
                RandomAccessFile randomAccessFile2 = new RandomAccessFile(localPath, "r");
                FileInputStream fileInputStream2 = new FileInputStream(randomAccessFile2.getFD());
                FileChannel channel = fileInputStream2.getChannel();
                for (ReadRequest readRequest : this.readRequests) {
                    if (this.cancelled) {
                        propagateCancel(getClass().getName());
                    }
                    int i = 0;
                    int actualReadLengthIntUnsafe = readRequest.getActualReadLengthIntUnsafe();
                    log.debug(String.format("Processing readrequest %d-%d, length %d", Long.valueOf(readRequest.actualReadStart), Long.valueOf(readRequest.actualReadEnd), Integer.valueOf(actualReadLengthIntUnsafe)));
                    while (i < readRequest.getActualReadLengthIntUnsafe()) {
                        int min = Math.min(actualReadLengthIntUnsafe, buffer.capacity());
                        buffer.clear();
                        int read = channel.read(buffer, readRequest.getActualReadStart() + i);
                        if (read <= 0) {
                            break;
                        }
                        buffer.flip();
                        int min2 = Math.min(min, read);
                        buffer.get(readRequest.getDestBuffer(), readRequest.getDestBufferOffset() + i, min2);
                        actualReadLengthIntUnsafe -= min2;
                        i += min2;
                    }
                    log.debug(String.format("CachedFileRead copied data [%d - %d] at buffer offset %d", Long.valueOf(readRequest.getActualReadStart()), Long.valueOf(readRequest.getActualReadStart() + i), Integer.valueOf(readRequest.getDestBufferOffset())));
                    if (i != readRequest.getActualReadLengthIntUnsafe()) {
                        throw new ReadRequestChain.InvalidationRequiredException("Cached read length didn't match with requested read length for file");
                    }
                    this.readFromCache += i;
                }
                log.debug(String.format("Read %d bytes from cached file", Long.valueOf(this.readFromCache)));
                this.bufferPool.returnBuffer(buffer);
                if (fileInputStream2 != null) {
                    fileInputStream2.close();
                }
                if (channel != null) {
                    channel.close();
                }
                if (randomAccessFile2 != null) {
                    randomAccessFile2.close();
                }
                if (0 != 0) {
                    this.corruptedFileCount++;
                    invalidateMetadata();
                }
                if (this.statistics != null) {
                    this.statistics.incrementBytesRead(this.readFromCache);
                }
                return Long.valueOf(this.readFromCache);
            } catch (Exception e) {
                if (e instanceof ReadRequestChain.CancelledException) {
                    throw e;
                }
                log.error(String.format("Fall back to read from object store for %s .Could not read data from cached file : ", localPath), e);
                CustomMetricsReporterProvider.getCustomMetricsReporter().addMetric(CachingFileSystemMetrics.LOCAL_FALLBACK_TO_DIRECT_READ);
                this.directDataRead = readFromRemoteFileSystem();
                Long valueOf = Long.valueOf(this.directDataRead);
                this.bufferPool.returnBuffer(buffer);
                if (0 != 0) {
                    fileInputStream.close();
                }
                if (0 != 0) {
                    fileChannel.close();
                }
                if (0 != 0) {
                    randomAccessFile.close();
                }
                if (1 != 0) {
                    this.corruptedFileCount++;
                    invalidateMetadata();
                }
                if (this.statistics != null) {
                    this.statistics.incrementBytesRead(this.readFromCache);
                }
                return valueOf;
            }
        } catch (Throwable th) {
            this.bufferPool.returnBuffer(buffer);
            if (0 != 0) {
                fileInputStream.close();
            }
            if (0 != 0) {
                fileChannel.close();
            }
            if (0 != 0) {
                randomAccessFile.close();
            }
            if (0 != 0) {
                this.corruptedFileCount++;
                invalidateMetadata();
            }
            if (this.statistics != null) {
                this.statistics.incrementBytesRead(this.readFromCache);
            }
            throw th;
        }
    }

    @Override // com.qubole.rubix.core.ReadRequestChain
    public void cancel() {
        super.cancel();
        if (this.directReadChain != null) {
            this.directReadChain.cancel();
        }
    }

    private void invalidateMetadata() {
        try {
            RetryingPooledBookkeeperClient createBookKeeperClient = this.factory.createBookKeeperClient(this.conf);
            Throwable th = null;
            try {
                createBookKeeperClient.invalidateFileMetadata(this.remotePath);
                if (createBookKeeperClient != null) {
                    if (0 != 0) {
                        try {
                            createBookKeeperClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createBookKeeperClient.close();
                    }
                }
            } finally {
            }
        } catch (Exception e) {
            log.error("Could not Invalidate Corrupted File " + this.remotePath + " Error : ", e);
        }
    }

    private long readFromRemoteFileSystem() throws IOException {
        this.readFromCache = 0L;
        if (this.cancelled) {
            return 0L;
        }
        FSDataInputStream open = this.remoteFileSystem.open(new Path(this.remotePath));
        Throwable th = null;
        try {
            this.directReadChain = new DirectReadRequestChain(open);
            Iterator<ReadRequest> it = this.readRequests.iterator();
            while (it.hasNext()) {
                this.directReadChain.addReadRequest(it.next());
            }
            this.directReadChain.lock();
            long longValue = this.directReadChain.call().longValue();
            this.directReadChain = null;
            if (open != null) {
                if (0 != 0) {
                    try {
                        open.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    open.close();
                }
            }
            return longValue;
        } catch (Throwable th3) {
            if (open != null) {
                if (0 != 0) {
                    try {
                        open.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    open.close();
                }
            }
            throw th3;
        }
    }

    @Override // com.qubole.rubix.core.ReadRequestChain
    public ReadRequestChainStats getStats() {
        return new ReadRequestChainStats().setCachedRRCDataRead(this.directDataRead == 0 ? this.readFromCache : 0L).setCachedRRCRequests(this.directDataRead == 0 ? this.requests : 0L).setDirectRRCDataRead(this.directDataRead).setDirectRRCRequests(this.directDataRead == 0 ? 0L : this.requests).setCorruptedFileCount(this.corruptedFileCount);
    }
}
