package com.qubole.rubix.bookkeeper;

import com.codahale.metrics.Counter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.jvm.CachedThreadStatesGaugeSet;
import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
import com.google.shaded.shaded.common.annotations.VisibleForTesting;
import com.qubole.rubix.common.metrics.BookKeeperMetrics;
import com.qubole.rubix.common.utils.ClusterUtil;
import com.qubole.rubix.spi.BookKeeperFactory;
import com.qubole.rubix.spi.CacheConfig;
import com.qubole.rubix.spi.CacheUtil;
import com.qubole.rubix.spi.CommonUtilities;
import com.qubole.rubix.spi.DataTransferClientHelper;
import com.qubole.rubix.spi.DataTransferHeader;
import com.qubole.rubix.spi.RetryingPooledBookkeeperClient;
import com.qubole.rubix.spi.thrift.BlockLocation;
import com.qubole.rubix.spi.thrift.CacheStatusRequest;
import com.qubole.rubix.spi.thrift.CacheStatusResponse;
import com.qubole.rubix.spi.thrift.Location;
import com.qubole.rubix.spi.thrift.ReadResponse;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.FileChannel;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.List;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.thrift.shaded.TException;

/* loaded from: input_file:com/qubole/rubix/bookkeeper/LocalDataTransferServer.class */
public class LocalDataTransferServer extends Configured implements Tool {
    private static Log log = LogFactory.getLog(LocalDataTransferServer.class.getName());
    private static Configuration conf;
    private static LocalServer localServer;
    private static MetricRegistry metrics;
    private static BookKeeperMetrics bookKeeperMetrics;
    private static Counter cachingExceptionCounter;

    /* loaded from: input_file:com/qubole/rubix/bookkeeper/LocalDataTransferServer$ClientServiceThread.class */
    static class ClientServiceThread extends Thread {
        SocketChannel localDataTransferClient;
        Configuration conf;
        BookKeeperFactory bookKeeperFactory;

        ClientServiceThread(SocketChannel socketChannel, Configuration configuration, BookKeeperFactory bookKeeperFactory) {
            this.localDataTransferClient = socketChannel;
            this.conf = configuration;
            this.bookKeeperFactory = bookKeeperFactory;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            int generationNumber;
            try {
                try {
                    LocalDataTransferServer.log.debug("Connected to node - Local Address: " + this.localDataTransferClient.getLocalAddress() + " Remote Address: " + this.localDataTransferClient.getRemoteAddress());
                    while (this.localDataTransferClient.isConnected()) {
                        ByteBuffer allocate = ByteBuffer.allocate(CacheConfig.getMaxHeaderSize(this.conf));
                        try {
                            if (this.localDataTransferClient.read(allocate) == -1) {
                                throw new IOException("Could not read data from Non-local node");
                            }
                            allocate.flip();
                            DataTransferHeader readHeaders = DataTransferClientHelper.readHeaders(allocate);
                            long offset = readHeaders.getOffset();
                            int readLength = readHeaders.getReadLength();
                            String filePath = readHeaders.getFilePath();
                            LocalDataTransferServer.log.debug(String.format("Trying to read from %s at offset %d and length %d for client %s", filePath, Long.valueOf(offset), Integer.valueOf(readLength), this.localDataTransferClient.getRemoteAddress()));
                            RetryingPooledBookkeeperClient createBookKeeperClient = this.bookKeeperFactory.createBookKeeperClient(this.conf);
                            Throwable th = null;
                            try {
                                if (CacheConfig.isParallelWarmupEnabled(this.conf)) {
                                    long blockSize = CacheConfig.getBlockSize(this.conf);
                                    long j = offset / blockSize;
                                    CacheStatusResponse cacheStatus = createBookKeeperClient.getCacheStatus(new CacheStatusRequest(filePath, readHeaders.getFileSize(), readHeaders.getLastModified(), j, ((offset + (readLength - 1)) / blockSize) + 1).setClusterType(readHeaders.getClusterType()));
                                    List<BlockLocation> blocks = cacheStatus.getBlocks();
                                    generationNumber = cacheStatus.getGenerationNumber();
                                    long j2 = j;
                                    for (BlockLocation blockLocation : blocks) {
                                        if (blockLocation.getLocation() != Location.CACHED) {
                                            LocalDataTransferServer.log.warn(String.format("The requested data for block %d of file %s is not in cache.  The data will be read from object store. Status: %s %s", Long.valueOf(j2), filePath, blockLocation.getLocation(), blockLocation.getRemoteLocation()));
                                            throw new Exception("The requested data in not in cache. The data will be read from object store");
                                        }
                                        j2++;
                                    }
                                } else {
                                    ReadResponse readData = createBookKeeperClient.readData(filePath, offset, readLength, readHeaders.getFileSize(), readHeaders.getLastModified(), readHeaders.getClusterType());
                                    generationNumber = readData.getGenerationNumber();
                                    if (!readData.isStatus()) {
                                        throw new Exception("Could not cache data required by non-local node");
                                    }
                                }
                                LocalDataTransferServer.log.debug(String.format("Done reading %d from %s at offset %d and length %d for client %s", Integer.valueOf(readDataFromCachedFile(createBookKeeperClient, filePath, generationNumber, offset, readLength)), filePath, Long.valueOf(offset), Integer.valueOf(readLength), this.localDataTransferClient.getRemoteAddress()));
                                if (createBookKeeperClient != null) {
                                    if (0 != 0) {
                                        try {
                                            createBookKeeperClient.close();
                                        } catch (Throwable th2) {
                                            th.addSuppressed(th2);
                                        }
                                    } else {
                                        createBookKeeperClient.close();
                                    }
                                }
                            } catch (Throwable th3) {
                                if (createBookKeeperClient != null) {
                                    if (0 != 0) {
                                        try {
                                            createBookKeeperClient.close();
                                        } catch (Throwable th4) {
                                            th.addSuppressed(th4);
                                        }
                                    } else {
                                        createBookKeeperClient.close();
                                    }
                                }
                                throw th3;
                            }
                        } catch (IOException e) {
                        }
                    }
                } catch (Exception e2) {
                    try {
                        LocalDataTransferServer.log.warn("Error in Local Data Transfer Server for client: " + this.localDataTransferClient.getRemoteAddress(), e2);
                        LocalDataTransferServer.cachingExceptionCounter.inc();
                    } catch (IOException e3) {
                        LocalDataTransferServer.log.warn("Error in Local Data Transfer Server for client: ", e2);
                    }
                    try {
                        this.localDataTransferClient.close();
                    } catch (IOException e4) {
                        LocalDataTransferServer.log.warn("Error in Local Data Transfer Server: ", e4);
                    }
                }
            } finally {
                try {
                    this.localDataTransferClient.close();
                } catch (IOException e5) {
                    LocalDataTransferServer.log.warn("Error in Local Data Transfer Server: ", e5);
                }
            }
        }

        private int readDataFromCachedFile(RetryingPooledBookkeeperClient retryingPooledBookkeeperClient, String str, int i, long j, int i2) throws IOException, TException {
            FileChannel fileChannel = null;
            int i3 = 0;
            String localPath = CacheUtil.getLocalPath(str, this.conf, i);
            try {
                try {
                    FileChannel channel = new FileInputStream(localPath).getChannel();
                    int dataTransferBufferSize = CacheConfig.getDataTransferBufferSize(this.conf);
                    int i4 = i2;
                    if (channel.size() < i2) {
                        LocalDataTransferServer.log.error(String.format("File size is smaller than requested read. Invalidating corrupted cached file %s", str));
                        retryingPooledBookkeeperClient.invalidateFileMetadata(str);
                        throw new IOException("File size is smaller than requested read");
                    }
                    while (i3 < i2) {
                        if (dataTransferBufferSize > i4) {
                            dataTransferBufferSize = i4;
                        }
                        i3 = (int) (i3 + channel.transferTo(j + i3, dataTransferBufferSize, this.localDataTransferClient));
                        i4 = i2 - i3;
                    }
                    if (channel != null) {
                        channel.close();
                    }
                    return i3;
                } catch (FileNotFoundException e) {
                    LocalDataTransferServer.log.error(String.format("Could not create file channel for %s. Invalidating missing remote file %s", localPath, str));
                    retryingPooledBookkeeperClient.invalidateFileMetadata(str);
                    throw new IOException(String.format("File not found %s ", localPath), e);
                } catch (Exception e2) {
                    if (e2 instanceof IOException) {
                        throw ((IOException) e2);
                    }
                    throw new IOException(e2);
                }
            } catch (Throwable th) {
                if (0 != 0) {
                    fileChannel.close();
                }
                throw th;
            }
        }
    }

    /* loaded from: input_file:com/qubole/rubix/bookkeeper/LocalDataTransferServer$LocalServer.class */
    public static class LocalServer implements Runnable {
        static ServerSocketChannel listener;
        Configuration conf;
        BookKeeperFactory bookKeeperFactory;

        public LocalServer(Configuration configuration, BookKeeper bookKeeper) {
            this(configuration, new BookKeeperFactory(bookKeeper));
        }

        public LocalServer(Configuration configuration, BookKeeperFactory bookKeeperFactory) {
            this.conf = configuration;
            this.bookKeeperFactory = bookKeeperFactory;
            int dataTransferServerPort = CacheConfig.getDataTransferServerPort(configuration);
            LocalDataTransferServer.log.debug("Starting LocalDataTransferServer on port " + dataTransferServerPort);
            try {
                listener = ServerSocketChannel.open();
                listener.bind(new InetSocketAddress(dataTransferServerPort), Integer.MAX_VALUE);
            } catch (IOException e) {
                throw new RuntimeException("Error starting Local Transfer server", e);
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    new ThreadPoolExecutor(0, CacheConfig.getLocalTransferServerMaxThreads(this.conf), 60L, TimeUnit.SECONDS, new SynchronousQueue(), CommonUtilities.threadsNamed("lds-worker-%s")).execute(new ClientServiceThread(listener.accept(), this.conf, this.bookKeeperFactory));
                } catch (AsynchronousCloseException e) {
                    LocalDataTransferServer.log.warn("Stopping Local Transfer server", e);
                    return;
                } catch (IOException e2) {
                    LocalDataTransferServer.log.error("Error accepting Local Transfer connection", e2);
                    return;
                }
            }
        }

        public boolean isAlive() {
            return listener.isOpen();
        }

        public void stop() {
            try {
                listener.close();
            } catch (IOException e) {
                LocalDataTransferServer.log.error("Error stopping Local Transfer server", e);
            }
        }
    }

    private LocalDataTransferServer() {
    }

    public static void main(String[] strArr) throws Exception {
        ToolRunner.run(new Configuration(), new LocalDataTransferServer(), strArr);
    }

    public int run(String[] strArr) throws Exception {
        conf = getConf();
        startServer(conf, new MetricRegistry());
        return 0;
    }

    public static void startServer(Configuration configuration, MetricRegistry metricRegistry) {
        startServer(configuration, metricRegistry, null);
    }

    public static void startServer(Configuration configuration, MetricRegistry metricRegistry, BookKeeper bookKeeper) {
        Configuration configuration2 = new Configuration(ClusterUtil.applyRubixSiteConfig(configuration));
        CacheConfig.setCacheDataEnabled(configuration2, false);
        metrics = metricRegistry;
        registerMetrics(configuration2);
        localServer = new LocalServer(configuration2, bookKeeper);
        new Thread(localServer).start();
    }

    private static void registerMetrics(Configuration configuration) {
        bookKeeperMetrics = new BookKeeperMetrics(configuration, metrics);
        metrics.register(BookKeeperMetrics.LDTSJvmMetric.LDTS_JVM_GC_PREFIX.getMetricName(), new GarbageCollectorMetricSet());
        metrics.register(BookKeeperMetrics.LDTSJvmMetric.LDTS_JVM_THREADS_PREFIX.getMetricName(), new CachedThreadStatesGaugeSet(CacheConfig.getMetricsReportingInterval(configuration), TimeUnit.MILLISECONDS));
        metrics.register(BookKeeperMetrics.LDTSJvmMetric.LDTS_JVM_MEMORY_PREFIX.getMetricName(), new MemoryUsageGaugeSet());
        cachingExceptionCounter = metrics.counter(BookKeeperMetrics.CacheMetric.LDTS_CACHING_EXCEPTION.getMetricName());
    }

    public static void stopServer() {
        if (isServerUp()) {
            removeMetrics();
            if (localServer != null) {
                try {
                    bookKeeperMetrics.close();
                } catch (IOException e) {
                    log.error("Metrics reporters could not be closed", e);
                }
                localServer.stop();
            }
        }
    }

    protected static void removeMetrics() {
        metrics.removeMatching(bookKeeperMetrics.getMetricsFilter());
    }

    @VisibleForTesting
    public static boolean isServerUp() {
        if (localServer != null) {
            return localServer.isAlive();
        }
        return false;
    }
}
