package io.micronaut.jms.listener;

import io.micronaut.core.util.ArgumentUtils;
import io.micronaut.jms.model.JMSDestinationType;
import io.micronaut.jms.pool.JMSConnectionPool;
import io.micronaut.messaging.exceptions.MessageListenerException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.PreDestroy;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageListener;
import javax.jms.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/micronaut/jms/listener/JMSListenerContainer.class */
public class JMSListenerContainer<T> {
    private static final Logger LOGGER = LoggerFactory.getLogger(JMSListenerContainer.class);
    private static final long DEFAULT_KEEP_ALIVE_TIME = 5;
    private static final int DEFAULT_EXECUTOR_QUEUE_SIZE = 10;
    private static final boolean DEFAULT_TRANSACTED = false;
    private static final int DEFAULT_ACKNOWLEDGE_MODE = 1;
    private final Set<Connection> openConnections;
    private final JMSConnectionPool connectionPool;
    private final int threadPoolSize;
    private final int maxThreadPoolSize;
    private final JMSDestinationType type;

    public JMSListenerContainer(JMSConnectionPool jMSConnectionPool, JMSDestinationType jMSDestinationType, int i) {
        this(jMSConnectionPool, jMSDestinationType, i, i);
    }

    public JMSListenerContainer(JMSConnectionPool jMSConnectionPool, JMSDestinationType jMSDestinationType, int i, int i2) {
        this.openConnections = new HashSet();
        ArgumentUtils.check(() -> {
            return i2 >= i;
        }).orElseFail("maxThreadPoolSize cannot be smaller than the threadPoolSize");
        this.connectionPool = jMSConnectionPool;
        this.type = jMSDestinationType;
        this.threadPoolSize = i;
        this.maxThreadPoolSize = i2;
    }

    public void registerListener(String str, MessageHandler<T> messageHandler, Class<T> cls) {
        try {
            Connection createConnection = this.connectionPool.createConnection();
            Session createSession = createConnection.createSession(false, DEFAULT_ACKNOWLEDGE_MODE);
            this.openConnections.add(createConnection);
            createSession.createConsumer(lookupDestination(str, createSession)).setMessageListener(new MessageHandlerAdapter(new ConcurrentMessageHandler(messageHandler, new ThreadPoolExecutor(this.threadPoolSize, this.maxThreadPoolSize, DEFAULT_KEEP_ALIVE_TIME, TimeUnit.SECONDS, new LinkedBlockingQueue(DEFAULT_EXECUTOR_QUEUE_SIZE), Executors.defaultThreadFactory())), cls));
            LOGGER.debug("registered {} listener {} for destination '{}' and class {}", new Object[]{this.type.name().toLowerCase(), messageHandler, str, cls.getName()});
        } catch (Exception e) {
            throw new MessageListenerException("Problem registering a MessageConsumer for " + str, e);
        }
    }

    public void registerListener(String str, MessageListener messageListener, Class<T> cls, boolean z, int i, Optional<String> optional) {
        try {
            Connection createConnection = this.connectionPool.createConnection();
            Session createSession = createConnection.createSession(z, i);
            this.openConnections.add(createConnection);
            (!optional.isPresent() ? createSession.createConsumer(lookupDestination(str, createSession)) : createSession.createConsumer(lookupDestination(str, createSession), optional.get())).setMessageListener(message -> {
                try {
                    messageListener.onMessage(message);
                    if (z) {
                        createSession.commit();
                    }
                } catch (Exception e) {
                    if (z) {
                        try {
                            createSession.rollback();
                        } catch (JMSException | RuntimeException e2) {
                            throw new MessageListenerException("Problem rolling back transaction", e2);
                        }
                    }
                    throw new MessageListenerException(e.getMessage(), e);
                }
            });
            LOGGER.debug("registered {} listener {} for destination '{}'; transacted: {}, ack mode: {}", new Object[]{this.type.name().toLowerCase(), messageListener, str, Boolean.valueOf(z), Integer.valueOf(i)});
        } catch (JMSException | RuntimeException e) {
            throw new MessageListenerException("Problem registering a MessageConsumer for " + str, e);
        }
    }

    @PreDestroy
    public boolean shutdown() {
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        Iterator<Connection> it = this.openConnections.iterator();
        while (it.hasNext()) {
            try {
                it.next().stop();
            } catch (JMSException | RuntimeException e) {
                atomicBoolean.set(false);
                LOGGER.error("Failed to stop connection", e);
            }
        }
        return atomicBoolean.get();
    }

    public String toString() {
        return "JMSListenerContainer{openConnections=" + this.openConnections + ", connectionPool=" + this.connectionPool + ", threadPoolSize=" + this.threadPoolSize + ", maxThreadPoolSize=" + this.maxThreadPoolSize + ", type=" + this.type + '}';
    }

    private Destination lookupDestination(String str, Session session) throws JMSException {
        return this.type == JMSDestinationType.QUEUE ? session.createQueue(str) : session.createTopic(str);
    }
}
