package com.qubole.rubix.core;

import com.google.shaded.shaded.common.base.Preconditions;
import com.google.shaded.shaded.common.base.Throwables;
import com.qubole.rubix.common.metrics.CachingFileSystemMetrics;
import com.qubole.rubix.common.metrics.CustomMetricsReporterProvider;
import com.qubole.rubix.spi.BookKeeperFactory;
import com.qubole.rubix.spi.CacheConfig;
import com.qubole.rubix.spi.DataTransferClientFactory;
import com.qubole.rubix.spi.DataTransferClientHelper;
import com.qubole.rubix.spi.DataTransferHeader;
import com.qubole.rubix.spi.RetryingPooledBookkeeperClient;
import com.qubole.rubix.spi.thrift.CacheStatusRequest;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

/* loaded from: input_file:com/qubole/rubix/core/NonLocalReadRequestChain.class */
public class NonLocalReadRequestChain extends ReadRequestChain {
    long fileSize;
    String filePath;
    long lastModified;
    String remoteNodeName;
    Configuration conf;
    long readFromNonLocalCache;
    long directRead;
    long directReadRequests;
    FileSystem remoteFileSystem;
    int clusterType;
    public boolean strictMode;
    FileSystem.Statistics statistics;
    DirectReadRequestChain directReadChain;
    private static final Log log = LogFactory.getLog(NonLocalReadRequestChain.class);
    private BookKeeperFactory bookKeeperFactory;

    public NonLocalReadRequestChain(String str, long j, long j2, Configuration configuration, FileSystem fileSystem, String str2, int i, boolean z, FileSystem.Statistics statistics) {
        super(0);
        this.bookKeeperFactory = new BookKeeperFactory();
        this.remoteNodeName = str;
        this.remoteFileSystem = fileSystem;
        this.lastModified = j2;
        this.filePath = str2;
        this.fileSize = j;
        this.conf = configuration;
        this.clusterType = i;
        this.strictMode = z;
        this.statistics = statistics;
    }

    @Override // com.qubole.rubix.core.ReadRequestChain
    public ReadRequestChainStats getStats() {
        return new ReadRequestChainStats().setNonLocalRRCDataRead(this.readFromNonLocalCache).setNonLocalRRCRequests(this.directReadRequests == 0 ? this.requests : this.requests - this.directReadRequests).setDirectRRCDataRead(this.directRead).setDirectRRCRequests(this.directReadRequests);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    /* JADX WARN: Finally extract failed */
    @Override // java.util.concurrent.Callable
    public Long call() throws Exception {
        log.debug(String.format("Read Request threadName: %s, Non Local read Executor threadName: %s", this.threadName, Thread.currentThread().getName()));
        Thread.currentThread().setName(this.threadName);
        if (this.readRequests.size() == 0) {
            return 0L;
        }
        Preconditions.checkState(this.isLocked, "Trying to execute Chain without locking");
        int i = 0;
        for (ReadRequest readRequest : this.readRequests) {
            this.readFromNonLocalCache += i;
            if (this.cancelled) {
                propagateCancel(getClass().getName());
            }
            try {
                try {
                    DataTransferClientFactory.DataTransferClient client = DataTransferClientFactory.getClient(this.remoteNodeName, this.conf);
                    Throwable th = null;
                    try {
                        ReadableByteChannel newChannel = Channels.newChannel(client.getSocketChannel().socket().getInputStream());
                        try {
                            client.getSocketChannel().write(DataTransferClientHelper.writeHeaders(this.conf, new DataTransferHeader(readRequest.getActualReadStart(), readRequest.getActualReadLengthIntUnsafe(), this.fileSize, this.lastModified, this.clusterType, this.filePath)));
                            int i2 = 0;
                            ByteBuffer wrap = ByteBuffer.wrap(readRequest.destBuffer, readRequest.getDestBufferOffset(), readRequest.destBuffer.length - readRequest.getDestBufferOffset());
                            while (i2 != readRequest.getActualReadLengthIntUnsafe()) {
                                try {
                                    int read = newChannel.read(wrap);
                                    if (read == -1) {
                                        log.warn("Error reading from Local Transfer Server");
                                        client.getSocketChannel().close();
                                        throw new IOException("Error reading from Local Transfer Server");
                                    }
                                    i2 += read;
                                    wrap.position(i2 + readRequest.getDestBufferOffset());
                                } catch (IOException e) {
                                    log.warn("Error in reading..closing socket channel: " + client.getSocketChannel(), e);
                                    client.getSocketChannel().close();
                                    throw e;
                                }
                            }
                            i = i2;
                            if (client != null) {
                                if (0 != 0) {
                                    try {
                                        client.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    client.close();
                                }
                            }
                            if (this.statistics != null) {
                                this.statistics.incrementBytesRead(this.readFromNonLocalCache);
                            }
                        } catch (IOException e2) {
                            client.getSocketChannel().close();
                            throw e2;
                        }
                    } catch (Throwable th3) {
                        if (client != null) {
                            if (0 != 0) {
                                try {
                                    client.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            } else {
                                client.close();
                            }
                        }
                        throw th3;
                    }
                } catch (Exception e3) {
                    if (this.strictMode) {
                        log.warn("Error reading data from node : " + this.remoteNodeName, e3);
                        throw Throwables.propagate(e3);
                    }
                    log.warn("Error in reading from node: " + this.remoteNodeName + " Using direct reads", e3);
                    CustomMetricsReporterProvider.getCustomMetricsReporter().addMetric(CachingFileSystemMetrics.NON_LOCAL_FALLBACK_TO_DIRECT_READ);
                    Long valueOf = Long.valueOf(directReadRequest(this.readRequests.indexOf(readRequest)));
                    if (this.statistics != null) {
                        this.statistics.incrementBytesRead(this.readFromNonLocalCache);
                    }
                    return valueOf;
                }
            } catch (Throwable th5) {
                if (this.statistics != null) {
                    this.statistics.incrementBytesRead(this.readFromNonLocalCache);
                }
                throw th5;
            }
        }
        this.readFromNonLocalCache += i;
        log.debug(String.format("Read %d bytes internally from node %s", Long.valueOf(this.readFromNonLocalCache), this.remoteNodeName));
        return Long.valueOf(this.readFromNonLocalCache);
    }

    @Override // com.qubole.rubix.core.ReadRequestChain
    public void cancel() {
        super.cancel();
        if (this.directReadChain != null) {
            this.directReadChain.cancel();
        }
    }

    private long directReadRequest(int i) throws Exception {
        FSDataInputStream open = this.remoteFileSystem.open(new Path(this.filePath));
        Throwable th = null;
        try {
            try {
                this.directReadChain = new DirectReadRequestChain(open);
                Iterator<ReadRequest> it = this.readRequests.subList(i, this.readRequests.size()).iterator();
                while (it.hasNext()) {
                    this.directReadChain.addReadRequest(it.next());
                    this.directReadRequests++;
                }
                this.directReadChain.lock();
                this.directRead = this.directReadChain.call().longValue();
                this.directReadChain = null;
                if (open != null) {
                    if (0 != 0) {
                        try {
                            open.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        open.close();
                    }
                }
                return this.readFromNonLocalCache + this.directRead;
            } finally {
            }
        } catch (Throwable th3) {
            if (open != null) {
                if (th != null) {
                    try {
                        open.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    open.close();
                }
            }
            throw th3;
        }
    }

    @Override // com.qubole.rubix.core.ReadRequestChain
    public void updateCacheStatus(String str, long j, long j2, int i, Configuration configuration) {
        if (CacheConfig.isDummyModeEnabled(configuration)) {
            try {
                RetryingPooledBookkeeperClient createBookKeeperClient = this.bookKeeperFactory.createBookKeeperClient(this.remoteNodeName, configuration);
                Throwable th = null;
                try {
                    try {
                        for (ReadRequest readRequest : this.readRequests) {
                            long block = toBlock(readRequest.getBackendReadStart());
                            long block2 = toBlock(readRequest.getBackendReadEnd() - 1) + 1;
                            createBookKeeperClient.getCacheStatus(new CacheStatusRequest(str, j, j2, block, block2).setClusterType(this.clusterType));
                            createBookKeeperClient.setAllCached(str, j, j2, block, block2, 1);
                        }
                        if (createBookKeeperClient != null) {
                            if (0 != 0) {
                                try {
                                    createBookKeeperClient.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                createBookKeeperClient.close();
                            }
                        }
                    } catch (Throwable th3) {
                        th = th3;
                        throw th3;
                    }
                } finally {
                }
            } catch (Exception e) {
                if (this.strictMode) {
                    throw Throwables.propagate(e);
                }
                log.error("Dummy Mode: Could not update Cache Status for Non-Local Read Request ", e);
            }
        }
    }

    private long toBlock(long j) {
        return j / CacheConfig.getBlockSize(this.conf);
    }
}
