package com.qubole.rubix.bookkeeper;

import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.jvm.CachedThreadStatesGaugeSet;
import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
import com.google.shaded.shaded.common.annotations.VisibleForTesting;
import com.google.shaded.shaded.common.base.Throwables;
import com.qubole.rubix.common.metrics.BookKeeperMetrics;
import com.qubole.rubix.common.utils.ClusterUtil;
import com.qubole.rubix.spi.CacheConfig;
import com.qubole.rubix.spi.thrift.BookKeeperService;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.thrift.shaded.server.TServer;
import org.apache.thrift.shaded.server.TThreadPoolServer;
import org.apache.thrift.shaded.transport.TServerSocket;
import org.apache.thrift.shaded.transport.TTransportException;

/* loaded from: input_file:com/qubole/rubix/bookkeeper/BookKeeperServer.class */
public class BookKeeperServer extends Configured implements Tool {
    public BookKeeperService.Processor processor;
    protected MetricRegistry metrics;
    protected BookKeeperMetrics bookKeeperMetrics;
    private BookKeeper localBookKeeper;
    public Configuration conf;
    private TServer server;
    private static Log log = LogFactory.getLog(BookKeeperServer.class.getName());

    public static void main(String[] strArr) throws Exception {
        ToolRunner.run(new Configuration(), new BookKeeperServer(), strArr);
    }

    public int run(String[] strArr) throws Exception {
        this.conf = getConf();
        new Thread(new Runnable() { // from class: com.qubole.rubix.bookkeeper.BookKeeperServer.1
            @Override // java.lang.Runnable
            public void run() {
                BookKeeperServer.this.startServer(BookKeeperServer.this.conf, new MetricRegistry());
            }
        }).run();
        return 0;
    }

    public BookKeeper startServer(Configuration configuration, MetricRegistry metricRegistry) {
        setupServer(configuration, metricRegistry);
        if (CacheConfig.isEmbeddedModeEnabled(configuration)) {
            createThriftServer(configuration, this.localBookKeeper);
            new Thread(new Runnable() { // from class: com.qubole.rubix.bookkeeper.BookKeeperServer.2
                @Override // java.lang.Runnable
                public void run() {
                    BookKeeperServer.this.startThriftServer();
                }
            }).start();
        } else {
            createThriftServer(configuration, this.localBookKeeper);
            startThriftServer();
        }
        return this.localBookKeeper;
    }

    public void setupServer(Configuration configuration, MetricRegistry metricRegistry) {
        Configuration configuration2 = new Configuration(ClusterUtil.applyRubixSiteConfig(configuration));
        CacheConfig.setCacheDataEnabled(configuration2, false);
        this.metrics = metricRegistry;
        this.bookKeeperMetrics = new BookKeeperMetrics(configuration2, this.metrics);
        registerMetrics(configuration2);
        try {
            if (CacheConfig.isOnMaster(configuration2)) {
                this.localBookKeeper = new CoordinatorBookKeeper(configuration2, this.bookKeeperMetrics);
            } else {
                this.localBookKeeper = new WorkerBookKeeper(configuration2, this.bookKeeperMetrics);
            }
        } catch (FileNotFoundException e) {
            log.error("Cache directories could not be created", e);
            throw Throwables.propagate(e);
        }
    }

    private void createThriftServer(Configuration configuration, BookKeeper bookKeeper) {
        this.processor = new BookKeeperService.Processor(bookKeeper);
        log.info("Starting BookKeeperServer on port " + CacheConfig.getBookKeeperServerPort(configuration));
        try {
            this.server = new TThreadPoolServer(new TThreadPoolServer.Args(new TServerSocket(new TServerSocket.ServerSocketTransportArgs().bindAddr(new InetSocketAddress(CacheConfig.getBookKeeperServerPort(configuration))).backlog(Integer.MAX_VALUE))).processor(this.processor).maxWorkerThreads(CacheConfig.getServerMaxThreads(configuration)));
        } catch (TTransportException e) {
            throw new RuntimeException("Error starting BookKeeperServer", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void startThriftServer() {
        this.server.serve();
    }

    protected void registerMetrics(Configuration configuration) {
        this.metrics.register(BookKeeperMetrics.BookKeeperJvmMetric.BOOKKEEPER_JVM_GC_PREFIX.getMetricName(), new GarbageCollectorMetricSet());
        this.metrics.register(BookKeeperMetrics.BookKeeperJvmMetric.BOOKKEEPER_JVM_THREADS_PREFIX.getMetricName(), new CachedThreadStatesGaugeSet(CacheConfig.getMetricsReportingInterval(configuration), TimeUnit.MILLISECONDS));
        this.metrics.register(BookKeeperMetrics.BookKeeperJvmMetric.BOOKKEEPER_JVM_MEMORY_PREFIX.getMetricName(), new MemoryUsageGaugeSet());
    }

    public void stopServer() {
        removeMetrics();
        try {
            this.bookKeeperMetrics.close();
        } catch (IOException e) {
            log.error("Metrics reporters could not be closed", e);
        }
        this.server.stop();
        log.info("Bookkeeper Server Stopped");
    }

    protected void removeMetrics() {
        this.metrics.removeMatching(this.bookKeeperMetrics.getMetricsFilter());
    }

    @VisibleForTesting
    public boolean isServerUp() {
        if (this.server != null) {
            return this.server.isServing();
        }
        return false;
    }
}
