/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.jms;

import io.confluent.kafka.jms.JMSClientConfig;
import io.confluent.kafka.jms.KafkaMessageConsumer;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class MessageListenerRunnable
implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(MessageListenerRunnable.class);
    final MessageListener messageListener;
    final Collection<KafkaMessageConsumer> messageConsumers;
    final JMSClientConfig jmsClientConfig;
    final AtomicBoolean stopped = new AtomicBoolean(false);
    Time time = new SystemTime();

    MessageListenerRunnable(JMSClientConfig jmsClientConfig, MessageListener messageListener, Collection<KafkaMessageConsumer> messageConsumers) {
        this.jmsClientConfig = jmsClientConfig;
        this.messageListener = messageListener;
        this.messageConsumers = messageConsumers;
    }

    @Override
    public void run() {
        while (!this.stopped.get()) {
            int messageNotAvailableCount = 0;
            block10: for (KafkaMessageConsumer messageConsumer : this.messageConsumers) {
                Message message;
                try {
                    message = messageConsumer.peekHeadOfQueue();
                }
                catch (IllegalArgumentException ex) {
                    log.trace("run() - Message consumer already closed.");
                    ++messageNotAvailableCount;
                    continue;
                }
                catch (JMSException ex) {
                    log.error("An error occurred reading from a MessageConsumer.", (Throwable)ex);
                    ++messageNotAvailableCount;
                    continue;
                }
                if (null == message) {
                    ++messageNotAvailableCount;
                    continue;
                }
                int redeliveredCount = 0;
                while (true) {
                    try {
                        this.messageListener.onMessage(message);
                        try {
                            messageConsumer.ackHeadOfQueueMaybe(message);
                            continue block10;
                        }
                        catch (Exception ex) {
                            log.error("An error occurred acknowledging message", (Throwable)ex);
                        }
                    }
                    catch (Exception ex) {
                        log.error("An error occurred executing a messageListener.", (Throwable)ex);
                        try {
                            if (messageConsumer.getAcknowledgeMode() != 1 && messageConsumer.getAcknowledgeMode() != 3) continue block10;
                            if (redeliveredCount++ != this.jmsClientConfig.messageListenerMaxRedeliveries) continue;
                            log.error("Maximum number of messageListener redeliveries exceeded.");
                            continue block10;
                        }
                        catch (JMSException e) {
                        }
                    }
                    break;
                }
            }
            if (messageNotAvailableCount != this.messageConsumers.size()) continue;
            this.time.sleep((long)this.jmsClientConfig.messageListenerNullWait);
        }
        log.info("Message Listener thread for {} finished", (Object)this.messageListener);
    }

    public void stop() {
        this.stopped.set(true);
    }
}

