package com.sproutsocial.nsq;

import com.sproutsocial.nsq.LookupResponse;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import net.jcip.annotations.GuardedBy;
import net.jcip.annotations.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:com/sproutsocial/nsq/Subscriber.class */
public class Subscriber extends BasePubSub {
    private final List<HostAndPort> lookups;
    private final List<Subscription> subscriptions;
    private final int lookupIntervalSecs;
    private int maxLookupFailuresBeforeError;
    private int defaultMaxInFlight;
    private int maxFlushDelayMillis;
    private int maxAttempts;
    private FailedMessageHandler failedMessageHandler;
    private final Map<String, Integer> failures;
    private static final int DEFAULT_LOOKUP_INTERVAL_SECS = 60;
    private static final int DEFAULT_MAX_LOOKUP_FAILURES_BEFORE_ERROR = 5;
    private static final Logger logger = LoggerFactory.getLogger(Subscriber.class);

    public Subscriber(Client client, int i, int i2, String... strArr) {
        super(client);
        this.lookups = new ArrayList();
        this.subscriptions = new ArrayList();
        this.defaultMaxInFlight = 200;
        this.maxFlushDelayMillis = 2000;
        this.maxAttempts = Integer.MAX_VALUE;
        this.failedMessageHandler = null;
        this.failures = new HashMap();
        Util.checkArgument(i > 0);
        this.lookupIntervalSecs = i;
        this.maxLookupFailuresBeforeError = i2;
        client.scheduleAtFixedRate(new Runnable() { // from class: com.sproutsocial.nsq.Subscriber.1
            @Override // java.lang.Runnable
            public void run() {
                Subscriber.this.lookup();
            }
        }, i * 1000, i * 1000, true);
        for (String str : strArr) {
            this.lookups.add(HostAndPort.fromString(str).withDefaultPort(4161));
        }
    }

    public Subscriber(int i, String... strArr) {
        this(Client.getDefaultClient(), i, DEFAULT_MAX_LOOKUP_FAILURES_BEFORE_ERROR, strArr);
    }

    public Subscriber(String... strArr) {
        this(Client.getDefaultClient(), DEFAULT_LOOKUP_INTERVAL_SECS, DEFAULT_MAX_LOOKUP_FAILURES_BEFORE_ERROR, strArr);
    }

    public synchronized void subscribe(String str, String str2, MessageHandler messageHandler) {
        subscribe(str, str2, this.defaultMaxInFlight, messageHandler);
    }

    public synchronized void subscribe(String str, String str2, final MessageDataHandler messageDataHandler) {
        subscribe(str, str2, this.defaultMaxInFlight, new BackoffHandler(new MessageHandler() { // from class: com.sproutsocial.nsq.Subscriber.2
            @Override // com.sproutsocial.nsq.MessageHandler
            public void accept(Message message) {
                messageDataHandler.accept(message.getData());
            }
        }));
    }

    public synchronized void subscribe(String str, String str2, int i, MessageHandler messageHandler) {
        Util.checkNotNull(str);
        Util.checkNotNull(str2);
        Util.checkNotNull(messageHandler);
        this.client.addSubscriber(this);
        Subscription subscription = new Subscription(this.client, str, str2, messageHandler, this, i);
        if (messageHandler instanceof BackoffHandler) {
            ((BackoffHandler) messageHandler).setSubscription(subscription);
        }
        this.subscriptions.add(subscription);
        subscription.checkConnections(lookupTopic(str));
    }

    public synchronized void setMaxInFlight(String str, String str2, int i) {
        for (Subscription subscription : this.subscriptions) {
            if (subscription.getTopic().equals(str) && subscription.getChannel().equals(str2)) {
                subscription.setMaxInFlight(i);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void lookup() {
        if (this.isStopping) {
            return;
        }
        for (Subscription subscription : this.subscriptions) {
            subscription.checkConnections(lookupTopic(subscription.getTopic()));
        }
    }

    @GuardedBy("this")
    protected Set<HostAndPort> lookupTopic(String str) {
        HashSet hashSet = new HashSet();
        for (HostAndPort hostAndPort : this.lookups) {
            String str2 = null;
            try {
                try {
                    str2 = String.format("http://%s/lookup?topic=%s", hostAndPort, URLEncoder.encode(str, "UTF-8"));
                    HttpURLConnection httpURLConnection = (HttpURLConnection) new URL(str2).openConnection();
                    httpURLConnection.setConnectTimeout(30000);
                    httpURLConnection.setReadTimeout(30000);
                    if (httpURLConnection.getResponseCode() != 200) {
                        logger.debug("ignoring lookup resp:{} nsqlookupd:{} topic:{}", new Object[]{Integer.valueOf(httpURLConnection.getResponseCode()), hostAndPort, str});
                        Util.closeQuietly(null);
                    } else {
                        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(httpURLConnection.getInputStream()));
                        LookupResponse lookupResponse = (LookupResponse) this.client.getGson().fromJson(bufferedReader, LookupResponse.class);
                        if (lookupResponse.getData() != null) {
                            lookupResponse = lookupResponse.getData();
                        }
                        for (LookupResponse.Producer producer : lookupResponse.getProducers()) {
                            hashSet.add(HostAndPort.fromParts(producer.getBroadcastAddress(), producer.getTcpPort()));
                        }
                        this.failures.remove(str2);
                        Util.closeQuietly(bufferedReader);
                    }
                } catch (UnsupportedEncodingException e) {
                    throw new RuntimeException(e);
                } catch (Exception e2) {
                    Integer num = this.failures.get(str2);
                    if (num == null) {
                        num = 0;
                    }
                    Integer valueOf = Integer.valueOf(num.intValue() + 1);
                    this.failures.put(str2, valueOf);
                    if (valueOf.intValue() >= this.maxLookupFailuresBeforeError) {
                        logger.error("lookup failure. lookup failed for {} consecutive tries. nsqlookupd:{} topic:{}", new Object[]{valueOf, hostAndPort, str, e2});
                    } else {
                        logger.warn("lookup failure. lookup failed for {} consecutive tries. nsqlookupd:{} topic:{}", new Object[]{valueOf, hostAndPort, str, e2});
                    }
                    Util.closeQuietly(null);
                }
            } catch (Throwable th) {
                Util.closeQuietly(null);
                throw th;
            }
        }
        return hashSet;
    }

    @Override // com.sproutsocial.nsq.BasePubSub
    public void stop() {
        super.stop();
        Iterator<Subscription> it = this.subscriptions.iterator();
        while (it.hasNext()) {
            it.next().stop();
        }
        logger.info("subscriber stopped");
    }

    public synchronized int getDefaultMaxInFlight() {
        return this.defaultMaxInFlight;
    }

    public synchronized void setDefaultMaxInFlight(int i) {
        this.defaultMaxInFlight = i;
    }

    public synchronized int getMaxFlushDelayMillis() {
        return this.maxFlushDelayMillis;
    }

    public synchronized void setMaxFlushDelayMillis(int i) {
        this.maxFlushDelayMillis = i;
    }

    public synchronized int getMaxAttempts() {
        return this.maxAttempts;
    }

    public synchronized void setMaxAttempts(int i) {
        this.maxAttempts = i;
    }

    public synchronized FailedMessageHandler getFailedMessageHandler() {
        return this.failedMessageHandler;
    }

    public synchronized void setFailedMessageHandler(FailedMessageHandler failedMessageHandler) {
        this.failedMessageHandler = failedMessageHandler;
    }

    public synchronized int getLookupIntervalSecs() {
        return this.lookupIntervalSecs;
    }

    public Integer getExecutorQueueSize() {
        ExecutorService executor = this.client.getExecutor();
        if (executor instanceof ThreadPoolExecutor) {
            return Integer.valueOf(((ThreadPoolExecutor) executor).getQueue().size());
        }
        return null;
    }

    public synchronized int getConnectionCount() {
        int i = 0;
        Iterator<Subscription> it = this.subscriptions.iterator();
        while (it.hasNext()) {
            i += it.next().getConnectionCount();
        }
        return i;
    }

    @Override // com.sproutsocial.nsq.BasePubSub
    public /* bridge */ /* synthetic */ void setConfig(Config config) {
        super.setConfig(config);
    }

    @Override // com.sproutsocial.nsq.BasePubSub
    public /* bridge */ /* synthetic */ Config getConfig() {
        return super.getConfig();
    }
}
