/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.jms;

import io.confluent.kafka.jms.JMSClientConfig;
import io.confluent.kafka.jms.KafkaConnection;
import io.confluent.kafka.jms.KafkaMessage;
import io.confluent.kafka.jms.KafkaMessageFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.JMSException;
import javax.jms.Message;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class KafkaMessageQueue
implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(KafkaMessageQueue.class);
    private final Consumer<byte[], byte[]> consumer;
    private final KafkaConnection connection;
    private final Queue<Message> messageQueue;
    private final Map<String, Long> committedOffsets;
    private final JMSClientConfig jmsClientConfig;
    private final ExecutorService executorService;
    private final AtomicBoolean closeInitiated = new AtomicBoolean(false);
    private final CountDownLatch closeLatch = new CountDownLatch(1);
    private final AtomicBoolean fetchInProgress = new AtomicBoolean(false);

    KafkaMessageQueue(JMSClientConfig jmsClientConfig, Consumer<byte[], byte[]> consumer, KafkaConnection connection) {
        this.consumer = consumer;
        this.connection = connection;
        this.jmsClientConfig = jmsClientConfig;
        this.messageQueue = new ConcurrentLinkedDeque<Message>();
        this.committedOffsets = new HashMap<String, Long>();
        this.executorService = Executors.newSingleThreadExecutor();
    }

    public boolean isEmpty() {
        return this.messageQueue.isEmpty();
    }

    public void clear() {
        log.trace("clearing {} messages and {} committed offsets for partitions {}", new Object[]{this.messageQueue.size(), this.committedOffsets.size(), Arrays.toString(this.committedOffsets.keySet().toArray())});
        this.messageQueue.clear();
        this.committedOffsets.clear();
    }

    public void commitSync(TopicPartition topicPartition, long offset) {
        this.consumer.wakeup();
        KafkaMessageQueue kafkaMessageQueue = this;
        synchronized (kafkaMessageQueue) {
            while (true) {
                try {
                    this.consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(offset)));
                    this.committedOffsets.put(topicPartition.toString(), offset);
                    return;
                }
                catch (WakeupException wakeupException) {
                    continue;
                }
                break;
            }
        }
    }

    public Long committed(TopicPartition topicPartition) {
        if (this.committedOffsets.containsKey(topicPartition.toString())) {
            return this.committedOffsets.get(topicPartition.toString());
        }
        log.trace("no cached committed offset for {}", (Object)topicPartition);
        return null;
    }

    public Message poll() throws JMSException {
        this.fetchMessagesIfQueueEmpty();
        return this.messageQueue.poll();
    }

    public Message peek() throws JMSException {
        this.fetchMessagesIfQueueEmpty();
        return this.messageQueue.peek();
    }

    public void fetchMessagesIfQueueEmpty() throws JMSException {
        if (!this.isEmpty()) {
            return;
        }
        if (this.closeInitiated.get()) {
            return;
        }
        if (!this.fetchInProgress.compareAndSet(false, true)) {
            return;
        }
        log.trace("fetchMessagesIfQueueEmpty() calling consumer.poll().");
        try {
            final KafkaMessageQueue kafkaMessageQueue = this;
            this.executorService.submit(new Runnable(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    try {
                        ConsumerRecords records;
                        1 var2_1 = this;
                        synchronized (var2_1) {
                            records = KafkaMessageQueue.this.consumer.poll(((KafkaMessageQueue)KafkaMessageQueue.this).jmsClientConfig.consumerPollTimeoutMs);
                        }
                        if (!records.isEmpty()) {
                            ArrayList<Message> messages = new ArrayList<Message>(records.count());
                            for (ConsumerRecord record : records) {
                                Message message = KafkaMessageFactory.createMessage(((KafkaMessageQueue)KafkaMessageQueue.this).jmsClientConfig.fallbackMessageType, ((KafkaMessageQueue)KafkaMessageQueue.this).jmsClientConfig.allowOutOfOrderAcknowledge, kafkaMessageQueue, (ConsumerRecord<byte[], byte[]>)record);
                                ((KafkaMessage)message).setJMSMessageID(new TopicPartition(record.topic(), record.partition()), record.offset());
                                messages.add(message);
                            }
                            log.trace("fetchMessagesIfQueueEmpty() - Adding {} message(s) to the queue.", (Object)messages.size());
                            KafkaMessageQueue.this.messageQueue.addAll(messages);
                        } else {
                            log.trace("fetchMessagesIfQueueEmpty() - No messages were returned from consumer.poll()");
                        }
                    }
                    catch (WakeupException ex) {
                        log.trace("fetchMessagesIfQueueEmpty() - consumer wakeup was called.");
                    }
                    catch (Exception ex) {
                        KafkaMessageQueue.this.connection.handleException(ex, "An error occured consuming messages");
                    }
                    finally {
                        KafkaMessageQueue.this.fetchInProgress.set(false);
                    }
                }
            });
        }
        catch (RejectedExecutionException ex) {
            this.fetchInProgress.set(false);
            this.connection.handleException(ex, "An error occured starting a fetch operation.");
        }
    }

    @Override
    public void close() throws IOException {
        if (this.closeInitiated.compareAndSet(false, true)) {
            Future<?> voidFuture = this.executorService.submit(new Runnable(){

                @Override
                public void run() {
                    log.trace("close() - Closing consumer");
                    KafkaMessageQueue.this.consumer.close();
                    KafkaMessageQueue.this.closeLatch.countDown();
                }
            });
            try {
                voidFuture.get(this.jmsClientConfig.consumerCloseTimeoutMs, TimeUnit.MILLISECONDS);
            }
            catch (Exception e) {
                log.error("exception thrown", (Throwable)e);
            }
            log.trace("close() - calling executorService.shutdown()");
            this.executorService.shutdown();
        } else {
            try {
                log.trace("close() - waiting {} ms for close service to complete.", (Object)this.jmsClientConfig.consumerCloseTimeoutMs);
                if (this.closeLatch.await(this.jmsClientConfig.consumerCloseTimeoutMs, TimeUnit.MILLISECONDS)) {
                    log.warn("Timeout while waiting for close to complete.");
                }
            }
            catch (InterruptedException e) {
                throw new IOException("Exception thrown during close.", e);
            }
        }
    }

    static class Enumeration
    implements java.util.Enumeration<Message> {
        final KafkaMessageQueue messageQueue;

        Enumeration(KafkaMessageQueue messageQueue) {
            this.messageQueue = messageQueue;
        }

        public void fetch() throws JMSException {
            this.messageQueue.fetchMessagesIfQueueEmpty();
        }

        @Override
        public boolean hasMoreElements() {
            return !this.messageQueue.isEmpty();
        }

        @Override
        public Message nextElement() {
            try {
                return this.messageQueue.poll();
            }
            catch (JMSException e) {
                return null;
            }
        }
    }
}

