package com.qubole.rubix.bookkeeper;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import com.google.shaded.shaded.common.util.concurrent.AbstractScheduledService;
import com.google.shaded.shaded.common.util.concurrent.MoreExecutors;
import com.google.shaded.shaded.common.util.concurrent.Service;
import com.qubole.rubix.bookkeeper.validation.CachingValidator;
import com.qubole.rubix.bookkeeper.validation.FileValidator;
import com.qubole.rubix.common.metrics.BookKeeperMetrics;
import com.qubole.rubix.common.utils.ClusterUtil;
import com.qubole.rubix.spi.BookKeeperFactory;
import com.qubole.rubix.spi.CacheConfig;
import com.qubole.rubix.spi.RetryingPooledBookkeeperClient;
import com.qubole.rubix.spi.thrift.HeartbeatStatus;
import java.io.IOException;
import java.net.InetAddress;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.thrift.shaded.TException;

/* loaded from: input_file:com/qubole/rubix/bookkeeper/HeartbeatService.class */
public class HeartbeatService extends AbstractScheduledService {
    private Log log = LogFactory.getLog(HeartbeatService.class.getName());
    private final ScheduledExecutorService validatorExecutor = Executors.newSingleThreadScheduledExecutor();
    private final RetryingPooledBookkeeperClient bookkeeperClient;
    private final int heartbeatInitialDelay;
    private final int heartbeatInterval;
    private final Configuration conf;
    private final BookKeeper bookKeeper;
    private final String masterHostname;
    private FileValidator fileValidator;
    private CachingValidator cachingValidator;

    /* loaded from: input_file:com/qubole/rubix/bookkeeper/HeartbeatService$FailureListener.class */
    private class FailureListener extends Service.Listener {
        private FailureListener() {
        }

        @Override // com.google.shaded.shaded.common.util.concurrent.Service.Listener
        public void failed(Service.State state, Throwable th) {
            super.failed(state, th);
            HeartbeatService.this.log.error("Encountered a problem", th);
        }
    }

    public HeartbeatService(Configuration configuration, MetricRegistry metricRegistry, BookKeeperFactory bookKeeperFactory, BookKeeper bookKeeper) {
        this.conf = configuration;
        this.bookKeeper = bookKeeper;
        this.heartbeatInitialDelay = CacheConfig.getHeartbeatInitialDelay(configuration);
        this.heartbeatInterval = CacheConfig.getHeartbeatInterval(configuration);
        this.masterHostname = ClusterUtil.getMasterHostname(configuration);
        this.bookkeeperClient = initializeClientWithRetry(bookKeeperFactory);
        if (CacheConfig.isValidationEnabled(configuration)) {
            this.cachingValidator = new CachingValidator(configuration, bookKeeper, this.validatorExecutor);
            this.cachingValidator.startAsync();
            metricRegistry.register(BookKeeperMetrics.ValidationMetric.CACHING_VALIDATION_SUCCESS_GAUGE.getMetricName(), new Gauge<Integer>() { // from class: com.qubole.rubix.bookkeeper.HeartbeatService.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // com.codahale.metrics.Gauge
                public Integer getValue() {
                    return Integer.valueOf(HeartbeatService.this.cachingValidator.didValidationSucceed() ? 1 : 0);
                }
            });
            this.fileValidator = new FileValidator(configuration, this.validatorExecutor);
            this.fileValidator.startAsync();
            metricRegistry.register(BookKeeperMetrics.ValidationMetric.FILE_VALIDATION_SUCCESS_GAUGE.getMetricName(), new Gauge<Integer>() { // from class: com.qubole.rubix.bookkeeper.HeartbeatService.2
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // com.codahale.metrics.Gauge
                public Integer getValue() {
                    return Integer.valueOf(HeartbeatService.this.fileValidator.didValidationSucceed() ? 1 : 0);
                }
            });
        }
    }

    private RetryingPooledBookkeeperClient initializeClientWithRetry(BookKeeperFactory bookKeeperFactory) {
        int serviceRetryInterval = CacheConfig.getServiceRetryInterval(this.conf);
        int serviceMaxRetries = CacheConfig.getServiceMaxRetries(this.conf);
        int i = 0;
        while (i < serviceMaxRetries) {
            try {
                return bookKeeperFactory.createBookKeeperClient(this.masterHostname, this.conf);
            } catch (Exception e) {
                i++;
                this.log.warn(String.format("Could not start client for heartbeat service [%d/%d attempts]", Integer.valueOf(i), Integer.valueOf(serviceMaxRetries)));
                if (i == serviceMaxRetries) {
                    break;
                }
                try {
                    Thread.sleep(serviceRetryInterval);
                } catch (InterruptedException e2) {
                    Thread.currentThread().interrupt();
                }
            }
        }
        this.log.fatal("Heartbeat service ran out of retries to connect to the master BookKeeper");
        throw new RuntimeException("Could not start heartbeat service");
    }

    @Override // com.google.shaded.shaded.common.util.concurrent.AbstractScheduledService
    protected void startUp() {
        this.log.info(String.format("Starting service %s in thread %d", serviceName(), Long.valueOf(Thread.currentThread().getId())));
        addListener(new FailureListener(), MoreExecutors.directExecutor());
    }

    @Override // com.google.shaded.shaded.common.util.concurrent.AbstractScheduledService
    protected void runOneIteration() {
        try {
            HeartbeatStatus heartbeatStatus = CacheConfig.isValidationEnabled(this.conf) ? new HeartbeatStatus(this.fileValidator.didValidationSucceed(), this.cachingValidator.didValidationSucceed()) : new HeartbeatStatus();
            this.log.debug(String.format("Sending heartbeat to %s", this.masterHostname));
            this.bookkeeperClient.handleHeartbeat(InetAddress.getLocalHost().getCanonicalHostName(), heartbeatStatus);
        } catch (IOException e) {
            this.log.error("Could not send heartbeat", e);
        } catch (TException e2) {
            this.log.error(String.format("Could not connect to master node [%s]; will reattempt on next heartbeat", this.masterHostname));
        }
    }

    @Override // com.google.shaded.shaded.common.util.concurrent.AbstractScheduledService
    protected AbstractScheduledService.Scheduler scheduler() {
        return AbstractScheduledService.Scheduler.newFixedDelaySchedule(this.heartbeatInitialDelay, this.heartbeatInterval, TimeUnit.MILLISECONDS);
    }
}
