package com.senseidb.search.node;

import com.senseidb.metrics.MetricsConstants;
import com.senseidb.search.req.AbstractSenseiRequest;
import com.senseidb.search.req.AbstractSenseiResult;
import com.senseidb.search.req.ErrorType;
import com.senseidb.search.req.SenseiError;
import com.senseidb.search.req.SenseiRequest;
import com.senseidb.servlet.SenseiSearchServletParams;
import com.senseidb.svc.api.SenseiException;
import com.twitter.finagle.Service;
import com.twitter.util.Duration;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Meter;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.Timer;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.log4j.Logger;
import zu.core.cluster.ZuCluster;
import zu.core.cluster.routing.ConsistentHashRoutingAlgorithm;
import zu.core.cluster.routing.RoutingAlgorithm;
import zu.finagle.client.ZuFinagleServiceDecorator;
import zu.finagle.client.ZuTransportClientProxy;
import zu.finagle.serialize.ZuSerializer;

/* loaded from: input_file:com/senseidb/search/node/AbstractConsistentHashBroker.class */
public abstract class AbstractConsistentHashBroker<REQUEST extends AbstractSenseiRequest, RESULT extends AbstractSenseiResult> extends AbstractSenseiBroker<REQUEST, RESULT> {
    private static final Logger logger = Logger.getLogger(AbstractConsistentHashBroker.class);
    private static final Logger queryLogger = Logger.getLogger("com.sensei.querylog");
    protected long _timeout = 8000;
    protected final ZuSerializer<REQUEST, RESULT> _serializer;
    private static Timer ScatterTimer;
    private static Timer GatherTimer;
    private static Timer TotalTimer;
    private static Meter SearchCounter;
    private static Meter ErrorMeter;
    private static Meter EmptyMeter;
    protected ZuFinagleServiceDecorator<REQUEST, RESULT> serviceDecorator;
    private final RoutingAlgorithm<Service<REQUEST, RESULT>> router;

    public AbstractConsistentHashBroker(ZuCluster zuCluster, ZuSerializer<REQUEST, RESULT> zuSerializer) {
        this._serializer = zuSerializer;
        this.serviceDecorator = new ZuFinagleServiceDecorator<>(new ZuTransportClientProxy(getMessageType(), this._serializer));
        this.router = new ConsistentHashRoutingAlgorithm(this.serviceDecorator);
        zuCluster.addClusterEventListener(this.router);
    }

    public REQUEST customizeRequest(REQUEST request) {
        return request;
    }

    @Override // com.senseidb.search.node.AbstractSenseiBroker
    public abstract RESULT getEmptyResultInstance();

    @Override // com.senseidb.search.node.AbstractSenseiBroker, com.senseidb.search.node.Broker
    public RESULT browse(final REQUEST request) throws SenseiException {
        SearchCounter.mark();
        try {
            return (RESULT) TotalTimer.time(new Callable<RESULT>() { // from class: com.senseidb.search.node.AbstractConsistentHashBroker.1
                /* JADX WARN: Multi-variable type inference failed */
                @Override // java.util.concurrent.Callable
                public RESULT call() throws Exception {
                    return (RESULT) AbstractConsistentHashBroker.this.doBrowse(request);
                }
            });
        } catch (Exception e) {
            ErrorMeter.mark();
            throw new SenseiException(e.getMessage(), e);
        }
    }

    public abstract RESULT mergeResults(REQUEST request, List<RESULT> list);

    protected String getRouteParam(REQUEST request) {
        String routeParam = request.getRouteParam();
        return routeParam == null ? RandomStringUtils.random(4) : routeParam;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v23, types: [com.senseidb.search.req.AbstractSenseiResult] */
    protected RESULT doBrowse(final REQUEST request) {
        RESULT emptyResultInstance;
        long currentTimeMillis = System.currentTimeMillis();
        final ArrayList arrayList = new ArrayList();
        try {
            arrayList.addAll((Collection) ScatterTimer.time(new Callable<List<RESULT>>() { // from class: com.senseidb.search.node.AbstractConsistentHashBroker.2
                /* JADX WARN: Multi-variable type inference failed */
                @Override // java.util.concurrent.Callable
                public List<RESULT> call() throws Exception {
                    return AbstractConsistentHashBroker.this.doCall(request);
                }
            }));
            if (arrayList.size() == 0) {
                logger.error("no result received at all return empty result");
                RESULT emptyResultInstance2 = getEmptyResultInstance();
                emptyResultInstance2.addError(new SenseiError("Error gathering the results. no result received at all return empty result", ErrorType.BrokerGatherError));
                EmptyMeter.mark();
                return emptyResultInstance2;
            }
            try {
                emptyResultInstance = (AbstractSenseiResult) GatherTimer.time(new Callable<RESULT>() { // from class: com.senseidb.search.node.AbstractConsistentHashBroker.3
                    /* JADX WARN: Multi-variable type inference failed */
                    @Override // java.util.concurrent.Callable
                    public RESULT call() throws Exception {
                        return (RESULT) AbstractConsistentHashBroker.this.mergeResults(request, arrayList);
                    }
                });
            } catch (Exception e) {
                emptyResultInstance = getEmptyResultInstance();
                logger.error("Error gathering the results: ", e);
                emptyResultInstance.addError(new SenseiError("Error gathering the results, " + e.getMessage(), ErrorType.BrokerGatherError));
                ErrorMeter.mark();
            }
            emptyResultInstance.setTime(System.currentTimeMillis() - currentTimeMillis);
            queryLogger.info("doBrowse took " + emptyResultInstance.getTime() + "ms");
            return emptyResultInstance;
        } catch (Exception e2) {
            ErrorMeter.mark();
            RESULT emptyResultInstance3 = getEmptyResultInstance();
            logger.error("Error running scatter/gather", e2);
            emptyResultInstance3.addError(new SenseiError("Error gathering the results, " + e2.getMessage(), ErrorType.BrokerGatherError));
            return emptyResultInstance3;
        }
    }

    protected List<RESULT> doCall(REQUEST request) throws ExecutionException {
        Set<Integer> shards = this.router.getShards();
        HashMap hashMap = new HashMap();
        byte[] bytes = getRouteParam(request).getBytes();
        for (Integer num : shards) {
            Service<REQUEST, RESULT> service = (Service) this.router.route(bytes, num.intValue());
            if (service == null) {
                logger.warn("router returned null as a destination service");
            } else {
                REQUEST request2 = hashMap.get(service);
                if (request2 == null) {
                    request2 = customizeRequest(((SenseiRequest) request).m256clone());
                    request2.setPartitions(new HashSet());
                    hashMap.put(service, request2);
                }
                request2.getPartitions().add(num);
            }
        }
        return executeRequestsInParallel(hashMap, this._timeout);
    }

    protected abstract String getMessageType();

    @Override // com.senseidb.search.node.AbstractSenseiBroker
    public void shutdown() {
        logger.info("shutting down broker...");
    }

    @Override // com.senseidb.search.node.AbstractSenseiBroker
    public long getTimeout() {
        return this._timeout;
    }

    @Override // com.senseidb.search.node.AbstractSenseiBroker
    public void setTimeout(long j) {
        this._timeout = j;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<RESULT> executeRequestsInParallel(Map<Service<REQUEST, RESULT>, REQUEST> map, long j) {
        long currentTimeMillis = System.currentTimeMillis();
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<Service<REQUEST, RESULT>, REQUEST> entry : map.entrySet()) {
            arrayList.add(entry.getKey().apply(entry.getValue()).addEventListener(new FutureEventListener<RESULT>() { // from class: com.senseidb.search.node.AbstractConsistentHashBroker.4
                public void onFailure(Throwable th) {
                    AbstractConsistentHashBroker.logger.error("Failed to get response", th);
                }

                public void onSuccess(RESULT result) {
                }
            }));
        }
        List<RESULT> list = (List) Future.collect(arrayList).apply(Duration.apply(j, TimeUnit.MILLISECONDS));
        logger.info(String.format("Getting responses from %d nodes took %dms.", Integer.valueOf(list.size()), Long.valueOf(System.currentTimeMillis() - currentTimeMillis)));
        return list;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static Set<InetSocketAddress> getNodesAddresses(Map<Integer, List<InetSocketAddress>> map) {
        HashSet hashSet = new HashSet();
        Iterator<List<InetSocketAddress>> it = map.values().iterator();
        while (it.hasNext()) {
            hashSet.addAll(it.next());
        }
        return hashSet;
    }

    static {
        ScatterTimer = null;
        GatherTimer = null;
        TotalTimer = null;
        SearchCounter = null;
        ErrorMeter = null;
        EmptyMeter = null;
        try {
            ScatterTimer = Metrics.newTimer(new MetricName(MetricsConstants.Domain, "timer", "scatter-time", "broker"), TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
            GatherTimer = Metrics.newTimer(new MetricName(MetricsConstants.Domain, "timer", "gather-time", "broker"), TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
            TotalTimer = Metrics.newTimer(new MetricName(MetricsConstants.Domain, "timer", "total-time", "broker"), TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
            SearchCounter = Metrics.newMeter(new MetricName(MetricsConstants.Domain, "meter", "search-count", "broker"), "requets", TimeUnit.SECONDS);
            ErrorMeter = Metrics.newMeter(new MetricName(MetricsConstants.Domain, "meter", "error-meter", "broker"), SenseiSearchServletParams.PARAM_RESULT_ERRORS, TimeUnit.SECONDS);
            EmptyMeter = Metrics.newMeter(new MetricName(MetricsConstants.Domain, "meter", "empty-meter", "broker"), "null-hits", TimeUnit.SECONDS);
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    }
}
