package com.sproutsocial.nsq;

import java.io.IOException;
import net.jcip.annotations.GuardedBy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/sproutsocial/nsq/SubConnection.class */
public class SubConnection extends Connection {
    private final MessageHandler handler;
    private final FailedMessageHandler failedMessageHandler;
    private final Subscription subscription;
    private final String topic;
    private final int maxAttempts;
    private final int maxFlushDelayMillis;
    private int inFlight;
    private int maxInFlight;
    private int maxUnflushed;
    private long finishedCount;
    private long requeuedCount;
    private static final Logger logger = LoggerFactory.getLogger(SubConnection.class);

    public SubConnection(Client client, HostAndPort hostAndPort, Subscription subscription) {
        super(client, hostAndPort);
        this.inFlight = 0;
        this.maxInFlight = 0;
        this.maxUnflushed = 0;
        this.finishedCount = 0L;
        this.requeuedCount = 0L;
        Subscriber subscriber = subscription.getSubscriber();
        this.handler = subscription.getHandler();
        this.failedMessageHandler = subscriber.getFailedMessageHandler();
        this.subscription = subscription;
        this.topic = subscription.getTopic();
        this.maxAttempts = subscriber.getMaxAttempts();
        this.maxFlushDelayMillis = subscriber.getMaxFlushDelayMillis();
        scheduleAtFixedRate(new Runnable() { // from class: com.sproutsocial.nsq.SubConnection.1
            @Override // java.lang.Runnable
            public void run() {
                SubConnection.this.delayedFlush();
            }
        }, this.maxFlushDelayMillis / 2, this.maxFlushDelayMillis / 2, false);
    }

    public synchronized void finish(String str) {
        try {
            writeCommand("FIN", str);
            this.finishedCount++;
            messageDone();
        } catch (IOException e) {
            logger.error("finish error. {}", stateDesc(), e);
            close();
        }
    }

    public synchronized void requeue(String str) {
        try {
            writeCommand("REQ", str, 0);
            this.requeuedCount++;
            messageDone();
        } catch (IOException e) {
            logger.error("requeue error. {}", stateDesc(), e);
            close();
        }
    }

    @GuardedBy("this")
    private void messageDone() throws IOException {
        this.inFlight = Math.max(this.inFlight - 1, 0);
        if (this.inFlight == 0 && this.isStopping) {
            flushAndClose();
        } else {
            checkFlush();
        }
    }

    public synchronized void touch(String str) {
        try {
            writeCommand("TOUCH", str);
            checkFlush();
        } catch (IOException e) {
            logger.error("touch error. {}", stateDesc(), e);
            close();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void delayedFlush() {
        try {
            if (this.unflushedCount > 0 && Util.clock() - this.lastActionFlush > (this.maxFlushDelayMillis / 2) + 10) {
                flush();
            }
        } catch (Exception e) {
            logger.error("delayedFlush error. {}", stateDesc(), e);
            close();
        }
    }

    @GuardedBy("this")
    private void checkFlush() throws IOException {
        if (this.unflushedCount >= this.maxUnflushed) {
            flush();
        } else {
            this.unflushedCount++;
        }
    }

    public synchronized void setMaxInFlight(int i) {
        setMaxInFlight(i, true);
    }

    public synchronized void setMaxInFlight(int i, boolean z) {
        try {
            if (this.maxInFlight == i) {
                return;
            }
            this.maxInFlight = i;
            this.maxUnflushed = Math.min(i / 3, 150);
            logger.debug("RDY:{} {}", Integer.valueOf(i), toString());
            writeCommand("RDY", Integer.valueOf(i));
            if (z) {
                flush();
            } else {
                this.out.flush();
            }
        } catch (IOException e) {
            logger.error("setMaxInFlight failed. con:{}", stateDesc(), e);
            close();
        }
    }

    public synchronized int getMaxInFlight() {
        return this.maxInFlight;
    }

    @Override // com.sproutsocial.nsq.Connection
    public synchronized void connect(Config config) throws IOException {
        this.client.addSubConnection(this);
        super.connect(config);
        writeCommand("SUB", this.subscription.getTopic(), this.subscription.getChannel());
        flushAndReadOK();
    }

    private void failMessage(final NSQMessage nSQMessage) {
        if (this.failedMessageHandler != null) {
            this.handlerExecutor.execute(new Runnable() { // from class: com.sproutsocial.nsq.SubConnection.2
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        SubConnection.this.failedMessageHandler.failed(SubConnection.this.subscription.getTopic(), SubConnection.this.subscription.getChannel(), nSQMessage);
                    } catch (Throwable th) {
                        SubConnection.logger.error("failed message error", th);
                    }
                }
            });
        }
        finish(nSQMessage.getId());
    }

    @Override // com.sproutsocial.nsq.Connection
    protected void onMessage(long j, int i, String str, byte[] bArr) {
        final NSQMessage nSQMessage = new NSQMessage(j, i, str, bArr, this.topic, this);
        synchronized (this) {
            this.inFlight++;
        }
        if (nSQMessage.getAttempts() >= this.maxAttempts) {
            failMessage(nSQMessage);
        } else {
            this.handlerExecutor.execute(new Runnable() { // from class: com.sproutsocial.nsq.SubConnection.3
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        SubConnection.this.handler.accept(nSQMessage);
                    } catch (Throwable th) {
                        SubConnection.logger.error("message error", th);
                    }
                }
            });
        }
    }

    @Override // com.sproutsocial.nsq.Connection, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        super.close();
        this.client.getSchedExecutor().execute(new Runnable() { // from class: com.sproutsocial.nsq.SubConnection.4
            @Override // java.lang.Runnable
            public void run() {
                SubConnection.this.subscription.connectionClosed(SubConnection.this);
                SubConnection.this.client.connectionClosed(SubConnection.this);
            }
        });
    }

    @Override // com.sproutsocial.nsq.BasePubSub
    public synchronized void stop() {
        super.stop();
        if (this.inFlight == 0) {
            flushAndClose();
        } else {
            setMaxInFlight(0);
        }
    }

    public String toString() {
        return String.format("SubCon:%s %s.%s", this.host.getHost(), this.subscription.getTopic(), this.subscription.getChannel());
    }

    @Override // com.sproutsocial.nsq.Connection
    public synchronized String stateDesc() {
        return String.format("%s inFlight:%d maxInFlight:%d fin:%d req:%d", super.stateDesc(), Integer.valueOf(this.inFlight), Integer.valueOf(this.maxInFlight), Long.valueOf(this.finishedCount), Long.valueOf(this.requeuedCount));
    }
}
