/*
 * Decompiled with CFR 0.152.
 */
package proj.zoie.dataprovider.jms;

import java.util.Comparator;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import org.apache.log4j.Logger;
import proj.zoie.api.DataConsumer;
import proj.zoie.dataprovider.jms.DataEventBuilder;
import proj.zoie.dataprovider.jms.TopicFactory;
import proj.zoie.impl.indexing.StreamDataProvider;

public class JMSStreamDataProvider<T>
extends StreamDataProvider<T> {
    private static final Logger logger = Logger.getLogger(JMSStreamDataProvider.class);
    private static final String name = "JMSStreamDataProvider";
    private final String topicName;
    private final String clientID;
    private final TopicConnectionFactory connectionFactory;
    private final TopicFactory topicFactory;
    private final DataEventBuilder<T> dataEventBuilder;
    private TopicSubscriber subscriber;
    private TopicConnection connection;
    private volatile int JMSErrorBackOffTime = 3000;
    private volatile boolean stopped = true;

    public JMSStreamDataProvider(String topicName, String clientID, TopicConnectionFactory connectionFactory, TopicFactory topicFactory, DataEventBuilder<T> dataEventBuilder, Comparator<String> versionComparator) {
        super(versionComparator);
        this.topicName = topicName;
        this.clientID = clientID;
        this.connectionFactory = connectionFactory;
        this.topicFactory = topicFactory;
        this.dataEventBuilder = dataEventBuilder;
    }

    public void start() {
        logger.info((Object)("starting " + this.toString()));
        this.stopped = false;
        super.start();
    }

    private void reconnect() {
        while (!this.stopped) {
            try {
                if (this.subscriber != null) {
                    this.subscriber.close();
                }
                if (this.connection != null) {
                    this.connection.close();
                }
                this.connection = this.connectionFactory.createTopicConnection();
                if (this.clientID != null) {
                    this.connection.setClientID(this.clientID);
                }
                TopicSession session = this.connection.createTopicSession(false, 1);
                Topic topic = this.topicFactory.createTopic(this.topicName);
                this.subscriber = session.createDurableSubscriber(topic, name);
                this.connection.start();
                return;
            }
            catch (JMSException e) {
                logger.error((Object)("could not connect to durable topic, topic: " + this.topicName), (Throwable)e);
                this.backOffAfterJMSException(e);
                continue;
            }
            break;
        }
        return;
    }

    public DataConsumer.DataEvent<T> next() {
        while (true) {
            if (this.subscriber == null) {
                this.reconnect();
            }
            if (this.stopped) {
                return null;
            }
            try {
                Message m = this.subscriber.receive();
                if (m == null) continue;
                return this.dataEventBuilder.buildDataEvent(m);
            }
            catch (JMSException e) {
                logger.error((Object)"error receiving message", (Throwable)e);
                this.backOffAfterJMSException(e);
                this.reconnect();
                continue;
            }
            break;
        }
    }

    void backOffAfterJMSException(JMSException e) {
        try {
            Thread.sleep(this.JMSErrorBackOffTime);
        }
        catch (InterruptedException e1) {
            logger.error((Object)e1);
        }
    }

    public int getJMSErrorBackOffTime() {
        return this.JMSErrorBackOffTime;
    }

    public void setJMSErrorBackOffTime(int jMSErrorBackOffTime) {
        this.JMSErrorBackOffTime = jMSErrorBackOffTime;
    }

    public void reset() {
        logger.error((Object)"reset called, not implemented by JMS data provider...");
    }

    public void setStartingOffset(String version) {
        logger.error((Object)"starting offset called, not implemented by JMS data provider...");
    }

    public void stop() {
        logger.info((Object)("stopping " + this.toString()));
        this.stopped = true;
        try {
            this.connection.stop();
        }
        catch (JMSException e) {
            logger.error((Object)"could not stop connection", (Throwable)e);
        }
        try {
            this.connection.close();
        }
        catch (JMSException e) {
            logger.error((Object)"could not close connection", (Throwable)e);
        }
        super.stop();
    }

    public String toString() {
        return "JMSStreamDataProvider [clientID=" + this.clientID + ", topicName=" + this.topicName + "]";
    }
}

