/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.jms;

import com.google.common.base.Stopwatch;
import io.confluent.kafka.jms.JMSClientConfig;
import io.confluent.kafka.jms.JMSConsumerRebalanceListener;
import io.confluent.kafka.jms.KafkaDestination;
import io.confluent.kafka.jms.KafkaMessageQueue;
import io.confluent.kafka.jms.KafkaSession;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class KafkaMessageConsumer
implements MessageConsumer {
    private static final Logger log = LoggerFactory.getLogger(KafkaMessageConsumer.class);
    final Consumer<byte[], byte[]> consumer;
    final KafkaMessageQueue messageQueue;
    final JMSClientConfig jmsClientConfig;
    final KafkaSession session;
    protected final KafkaDestination destination;
    MessageListener messageListener;
    static final String REGEX_PREFIX = "regex(";
    static final String REGEX_SUFFIX = ")";

    KafkaMessageConsumer(JMSClientConfig jmsClientConfig, Consumer<byte[], byte[]> consumer, KafkaDestination destination, KafkaSession session) {
        this.consumer = consumer;
        this.jmsClientConfig = jmsClientConfig;
        this.messageQueue = new KafkaMessageQueue(this.jmsClientConfig, consumer, session.connection);
        this.session = session;
        this.destination = destination;
        if (null == destination) {
            return;
        }
        if (destination.topic.startsWith(REGEX_PREFIX) && destination.topic.endsWith(REGEX_SUFFIX)) {
            String regexStr = destination.topic.substring(REGEX_PREFIX.length(), destination.topic.length() - REGEX_SUFFIX.length());
            this.consumer.subscribe(Pattern.compile(regexStr), (ConsumerRebalanceListener)new JMSConsumerRebalanceListener(this.messageQueue));
            return;
        }
        this.consumer.subscribe(Collections.singletonList(destination.topic), (ConsumerRebalanceListener)new JMSConsumerRebalanceListener(this.messageQueue));
    }

    int getAcknowledgeMode() throws JMSException {
        return this.session.getAcknowledgeMode();
    }

    public String getMessageSelector() throws JMSException {
        return null;
    }

    public MessageListener getMessageListener() throws JMSException {
        return this.messageListener;
    }

    public void setMessageListener(MessageListener messageListener) throws JMSException {
        if (null != this.messageListener) {
            this.session.connection.removeMessageListener(this.messageListener, this);
        }
        this.messageListener = messageListener;
        this.session.connection.addMessageListener(this.messageListener, this);
    }

    public Message receive() throws JMSException {
        Message result;
        while (null == (result = this.receive(100L))) {
        }
        return result;
    }

    public Message receive(long millisecondsTimeout) throws JMSException {
        if (millisecondsTimeout <= 0L) {
            throw new JMSException("timeout must be greater than 0.");
        }
        Stopwatch stopwatch = Stopwatch.createStarted();
        while (millisecondsTimeout >= stopwatch.elapsed(TimeUnit.MILLISECONDS)) {
            Message message = this.receiveNoWait();
            if (null != message) {
                return message;
            }
            try {
                Thread.sleep(25L);
            }
            catch (InterruptedException e) {
                break;
            }
        }
        return null;
    }

    Message peekHeadOfQueue() throws JMSException {
        return this.messageQueue.peek();
    }

    void ackHeadOfQueueMaybe(Message message) throws JMSException {
        Message polled = this.messageQueue.poll();
        if (message != polled) {
            throw new JMSException("Attempted to acknowledge message that is not at head of queue.");
        }
        if (this.getAcknowledgeMode() == 1 || this.getAcknowledgeMode() == 3) {
            message.acknowledge();
        }
    }

    public Message receiveNoWait() throws JMSException {
        if (null == this.messageQueue.peek()) {
            return null;
        }
        Message result = this.messageQueue.poll();
        if (this.getAcknowledgeMode() == 1 || this.getAcknowledgeMode() == 3) {
            result.acknowledge();
        }
        return result;
    }

    public void close() throws JMSException {
        try {
            this.messageQueue.close();
        }
        catch (IOException e) {
            log.error("Exception thrown while closing queue", (Throwable)e);
        }
    }
}

