package com.sproutsocial.nsq;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import net.jcip.annotations.GuardedBy;
import net.jcip.annotations.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:META-INF/bundled-dependencies/nsq-j-1.0.jar:com/sproutsocial/nsq/Publisher.class */
public class Publisher extends BasePubSub {
    private final HostAndPort nsqd;
    private final HostAndPort failoverNsqd;
    private PubConnection con;
    private boolean isFailover;
    private long failoverStart;
    private int failoverDurationSecs;
    private final Map<String, Batcher> batchers;
    private ScheduledExecutorService batchExecutor;
    private static final int DEFAULT_MAX_BATCH_SIZE = 16384;
    private static final int DEFUALT_MAX_BATCH_DELAY = 300;
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) Publisher.class);

    public Publisher(Client client, String str, String str2) {
        super(client);
        this.isFailover = false;
        this.failoverDurationSecs = DEFUALT_MAX_BATCH_DELAY;
        this.batchers = new HashMap();
        this.nsqd = HostAndPort.fromString(str).withDefaultPort(4150);
        this.failoverNsqd = str2 != null ? HostAndPort.fromString(str2).withDefaultPort(4150) : null;
        client.addPublisher(this);
    }

    public Publisher(String str, String str2) {
        this(Client.getDefaultClient(), str, str2);
    }

    public Publisher(String str) {
        this(str, null);
    }

    @GuardedBy("this")
    private void checkConnection() throws IOException {
        if (this.con == null) {
            if (this.isStopping) {
                throw new NSQException("publisher stopped");
            }
            connect(this.nsqd);
        } else {
            if (!this.isFailover || Util.clock() - this.failoverStart <= this.failoverDurationSecs * 1000) {
                return;
            }
            this.isFailover = false;
            connect(this.nsqd);
            logger.info("using primary nsqd");
        }
    }

    @GuardedBy("this")
    private void connect(HostAndPort hostAndPort) throws IOException {
        if (this.con != null) {
            this.con.close();
        }
        this.con = new PubConnection(this.client, hostAndPort, this);
        try {
            this.con.connect(this.config);
            logger.info("publisher connected:{}", hostAndPort);
        } catch (IOException e) {
            this.con.close();
            this.con = null;
            throw e;
        }
    }

    public synchronized void connectionClosed(PubConnection pubConnection) {
        if (this.con == pubConnection) {
            this.con = null;
            logger.debug("removed closed publisher connection:{}", pubConnection.getHost());
        }
    }

    public synchronized void publish(String str, byte[] bArr) {
        Util.checkNotNull(str);
        Util.checkNotNull(bArr);
        Util.checkArgument(bArr.length > 0);
        try {
            checkConnection();
            this.con.publish(str, bArr);
        } catch (Exception e) {
            logger.error("publish error with:{}", this.isFailover ? this.failoverNsqd : this.nsqd, e);
            publishFailover(str, bArr);
        }
    }

    public synchronized void publishDeferred(String str, byte[] bArr, long j, TimeUnit timeUnit) {
        Util.checkNotNull(str);
        Util.checkNotNull(bArr);
        Util.checkArgument(bArr.length > 0);
        Util.checkArgument(j > 0);
        Util.checkNotNull(timeUnit);
        try {
            checkConnection();
            this.con.publishDeferred(str, bArr, timeUnit.toMillis(j));
        } catch (Exception e) {
            throw new NSQException("deferred publish failed", e);
        }
    }

    public synchronized void publish(String str, List<byte[]> list) {
        Util.checkNotNull(str);
        Util.checkNotNull(list);
        Util.checkArgument(list.size() > 0);
        try {
            checkConnection();
            this.con.publish(str, list);
        } catch (Exception e) {
            logger.error("publish error with:{}", this.isFailover ? this.failoverNsqd : this.nsqd, e);
            Iterator<byte[]> it = list.iterator();
            while (it.hasNext()) {
                publishFailover(str, it.next());
            }
        }
    }

    @GuardedBy("this")
    private void publishFailover(String str, byte[] bArr) {
        try {
            if (this.failoverNsqd == null) {
                logger.warn("publish failed but no failoverNsqd configured. Will wait and retry once.");
                Util.sleepQuietly(10000);
                connect(this.nsqd);
            } else if (!this.isFailover) {
                this.failoverStart = Util.clock();
                this.isFailover = true;
                connect(this.failoverNsqd);
                logger.info("using failover nsqd:{}", this.failoverNsqd);
            }
            this.con.publish(str, bArr);
        } catch (Exception e) {
            Util.closeQuietly(this.con);
            this.con = null;
            this.isFailover = false;
            throw new NSQException("publish failed", e);
        }
    }

    public synchronized void publishBuffered(String str, byte[] bArr) {
        Util.checkNotNull(str);
        Util.checkNotNull(bArr);
        Util.checkArgument(bArr.length > 0);
        Batcher batcher = this.batchers.get(str);
        if (batcher == null) {
            batcher = new Batcher(this, str, DEFAULT_MAX_BATCH_SIZE, DEFUALT_MAX_BATCH_DELAY);
            this.batchers.put(str, batcher);
        }
        batcher.publish(bArr);
    }

    public synchronized void setBatchConfig(String str, int i, int i2) {
        Batcher batcher = this.batchers.get(str);
        if (batcher != null) {
            batcher.sendBatch();
        }
        this.batchers.put(str, new Batcher(this, str, i, i2));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized ScheduledExecutorService getBatchExecutor() {
        if (this.batchExecutor == null) {
            this.batchExecutor = Executors.newScheduledThreadPool(1, Util.threadFactory("nsq-batch"));
        }
        return this.batchExecutor;
    }

    @Override // com.sproutsocial.nsq.BasePubSub
    public synchronized void stop() {
        Iterator<Batcher> it = this.batchers.values().iterator();
        while (it.hasNext()) {
            it.next().sendBatch();
        }
        super.stop();
        Util.closeQuietly(this.con);
        this.con = null;
        if (this.batchExecutor != null) {
            Util.shutdownAndAwaitTermination(this.batchExecutor, 40L, TimeUnit.MILLISECONDS);
        }
        if (this.client.isLonePublisher(this)) {
            Util.shutdownAndAwaitTermination(this.client.getSchedExecutor(), 40L, TimeUnit.MILLISECONDS);
        }
    }

    public synchronized int getFailoverDurationSecs() {
        return this.failoverDurationSecs;
    }

    public synchronized void setFailoverDurationSecs(int i) {
        this.failoverDurationSecs = i;
    }

    @Override // com.sproutsocial.nsq.BasePubSub
    public /* bridge */ /* synthetic */ void setConfig(Config config) {
        super.setConfig(config);
    }

    @Override // com.sproutsocial.nsq.BasePubSub
    public /* bridge */ /* synthetic */ Config getConfig() {
        return super.getConfig();
    }
}
