package proj.zoie.dataprovider.jms;

import java.util.Comparator;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSubscriber;
import org.apache.log4j.Logger;
import proj.zoie.api.DataConsumer;
import proj.zoie.impl.indexing.StreamDataProvider;

/* loaded from: input_file:proj/zoie/dataprovider/jms/JMSStreamDataProvider.class */
public class JMSStreamDataProvider<T> extends StreamDataProvider<T> {
    private static final Logger logger = Logger.getLogger(JMSStreamDataProvider.class);
    private static final String name = "JMSStreamDataProvider";
    private final String topicName;
    private final String clientID;
    private final TopicConnectionFactory connectionFactory;
    private final TopicFactory topicFactory;
    private final DataEventBuilder<T> dataEventBuilder;
    private TopicSubscriber subscriber;
    private TopicConnection connection;
    private volatile int JMSErrorBackOffTime;
    private volatile boolean stopped;

    public JMSStreamDataProvider(String str, String str2, TopicConnectionFactory topicConnectionFactory, TopicFactory topicFactory, DataEventBuilder<T> dataEventBuilder, Comparator<String> comparator) {
        super(comparator);
        this.JMSErrorBackOffTime = 3000;
        this.stopped = true;
        this.topicName = str;
        this.clientID = str2;
        this.connectionFactory = topicConnectionFactory;
        this.topicFactory = topicFactory;
        this.dataEventBuilder = dataEventBuilder;
    }

    public void start() {
        logger.info("starting " + toString());
        this.stopped = false;
        super.start();
    }

    private void reconnect() {
        while (!this.stopped) {
            try {
                if (this.subscriber != null) {
                    this.subscriber.close();
                }
                if (this.connection != null) {
                    this.connection.close();
                }
                this.connection = this.connectionFactory.createTopicConnection();
                if (this.clientID != null) {
                    this.connection.setClientID(this.clientID);
                }
                this.subscriber = this.connection.createTopicSession(false, 1).createDurableSubscriber(this.topicFactory.createTopic(this.topicName), name);
                this.connection.start();
                return;
            } catch (JMSException e) {
                logger.error("could not connect to durable topic, topic: " + this.topicName, e);
                backOffAfterJMSException(e);
            }
        }
    }

    public DataConsumer.DataEvent<T> next() {
        Message receive;
        while (true) {
            if (this.subscriber == null) {
                reconnect();
            }
            if (this.stopped) {
                return null;
            }
            try {
                receive = this.subscriber.receive();
            } catch (JMSException e) {
                logger.error("error receiving message", e);
                backOffAfterJMSException(e);
                reconnect();
            }
            if (receive != null) {
                return this.dataEventBuilder.buildDataEvent(receive);
            }
            continue;
        }
    }

    void backOffAfterJMSException(JMSException jMSException) {
        try {
            Thread.sleep(this.JMSErrorBackOffTime);
        } catch (InterruptedException e) {
            logger.error(e);
        }
    }

    public int getJMSErrorBackOffTime() {
        return this.JMSErrorBackOffTime;
    }

    public void setJMSErrorBackOffTime(int i) {
        this.JMSErrorBackOffTime = i;
    }

    public void reset() {
        logger.error("reset called, not implemented by JMS data provider...");
    }

    public void setStartingOffset(String str) {
        logger.error("starting offset called, not implemented by JMS data provider...");
    }

    public void stop() {
        logger.info("stopping " + toString());
        this.stopped = true;
        try {
            this.connection.stop();
        } catch (JMSException e) {
            logger.error("could not stop connection", e);
        }
        try {
            this.connection.close();
        } catch (JMSException e2) {
            logger.error("could not close connection", e2);
        }
        super.stop();
    }

    public String toString() {
        return "JMSStreamDataProvider [clientID=" + this.clientID + ", topicName=" + this.topicName + "]";
    }
}
