package io.confluent.kafka.jms;

import com.google.common.base.Supplier;
import com.google.common.collect.ForwardingSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimaps;
import com.google.common.collect.SetMultimap;
import com.google.common.collect.Sets;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
import javax.jms.ConnectionMetaData;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.Topic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/confluent/kafka/jms/KafkaConnection.class */
public class KafkaConnection implements Connection {
    private static final Logger log = LoggerFactory.getLogger(KafkaConnection.class);
    final JMSClientConfig jmsClientConfig;
    ExceptionListener exceptionListener;
    Map<MessageListener, MessageListenerRunnable> messageListenerRunnables;
    ExecutorService executorService;
    ProducerFactory producerFactory;
    ConsumerFactory consumerFactory;
    final ClientState clientState = new ClientState();
    final SetMultimap<MessageListener, MessageConsumer> messageListeners = Multimaps.newSetMultimap(Maps.newIdentityHashMap(), new Supplier<Set<MessageConsumer>>() { // from class: io.confluent.kafka.jms.KafkaConnection.1
        /* renamed from: get, reason: merged with bridge method [inline-methods] */
        public Set<MessageConsumer> m9get() {
            return Sets.newIdentityHashSet();
        }
    });

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaConnection(JMSClientConfig jMSClientConfig) {
        this.jmsClientConfig = jMSClientConfig;
        this.producerFactory = new ProducerFactoryImpl(this.jmsClientConfig.producer);
        this.consumerFactory = new ConsumerFactoryImpl(this.jmsClientConfig.consumer);
    }

    public Session createSession(boolean z, int i) throws JMSException {
        try {
            return new KafkaSession(this, z, i);
        } catch (Exception e) {
            throw new ConfigurationJMSException("Unable to create session", e);
        }
    }

    public String getClientID() throws JMSException {
        return this.jmsClientConfig.clientID;
    }

    @Unsupported("Setting the client id is only supported by using `client.id` in the settings passed to KafkaConnectionFactory.")
    public void setClientID(String str) throws JMSException {
        throw new IllegalStateException("client.id cannot be configured by calling setClientID");
    }

    public ConnectionMetaData getMetaData() throws JMSException {
        return new KafkaConnectionMetaData();
    }

    public ExceptionListener getExceptionListener() throws JMSException {
        ExceptionListener exceptionListener;
        synchronized (this) {
            exceptionListener = this.exceptionListener;
        }
        return exceptionListener;
    }

    public void setExceptionListener(ExceptionListener exceptionListener) throws JMSException {
        synchronized (this) {
            this.exceptionListener = exceptionListener;
        }
    }

    public void start() throws JMSException {
        synchronized (this) {
            if (null != this.executorService) {
                log.warn("Connection.start() has already been called.");
                return;
            }
            this.executorService = Executors.newCachedThreadPool();
            this.messageListenerRunnables = new IdentityHashMap();
            for (final MessageListener messageListener : this.messageListeners.keySet()) {
                ForwardingSet<MessageConsumer> forwardingSet = new ForwardingSet<MessageConsumer>() { // from class: io.confluent.kafka.jms.KafkaConnection.2
                    /* JADX INFO: Access modifiers changed from: protected */
                    /* renamed from: delegate, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
                    public Set<MessageConsumer> m11delegate() {
                        return KafkaConnection.this.messageListeners.get(messageListener);
                    }
                };
                log.trace("start() - Starting thread for {}", messageListener);
                MessageListenerRunnable messageListenerRunnable = new MessageListenerRunnable(this.jmsClientConfig, messageListener, forwardingSet);
                this.messageListenerRunnables.put(messageListener, messageListenerRunnable);
                this.executorService.submit(messageListenerRunnable);
            }
        }
    }

    public void stop() throws JMSException {
        synchronized (this) {
            if (null == this.executorService) {
                log.warn("Connection.stop() has already been called or Connection.start() has never been called.");
                return;
            }
            this.executorService.shutdown();
            Iterator<MessageListenerRunnable> it = this.messageListenerRunnables.values().iterator();
            while (it.hasNext()) {
                it.next().stop();
            }
            try {
                if (!this.executorService.awaitTermination(this.jmsClientConfig.connectionStopTimeoutMs, TimeUnit.MILLISECONDS)) {
                    log.trace("Timeout waiting for termination.");
                }
            } catch (InterruptedException e) {
            }
            this.executorService = null;
        }
    }

    public void close() throws JMSException {
        stop();
        this.clientState.close();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void registerProducer(KafkaSession kafkaSession, KafkaMessageProducer kafkaMessageProducer) {
        this.clientState.register(kafkaSession, kafkaMessageProducer);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void registerConsumer(KafkaSession kafkaSession, KafkaMessageConsumer kafkaMessageConsumer) {
        this.clientState.register(kafkaSession, kafkaMessageConsumer);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void closeSessionClients(KafkaSession kafkaSession) throws JMSException {
        this.clientState.close(kafkaSession);
    }

    @Unsupported("This is an expert facility not used by regular JMS clients.")
    public ConnectionConsumer createConnectionConsumer(Destination destination, String str, ServerSessionPool serverSessionPool, int i) throws JMSException {
        throw new UnsupportedOperationException();
    }

    @Unsupported("This is an expert facility not used by regular JMS clients.")
    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String str, String str2, ServerSessionPool serverSessionPool, int i) throws JMSException {
        throw new UnsupportedOperationException();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addMessageListener(MessageListener messageListener, KafkaMessageConsumer kafkaMessageConsumer) throws JMSException {
        synchronized (this) {
            if (this.executorService != null) {
                throw new JMSException("Adding message listeners after Connection start is not currently supported.");
            }
            this.messageListeners.put(messageListener, kafkaMessageConsumer);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeMessageListener(MessageListener messageListener, KafkaMessageConsumer kafkaMessageConsumer) throws JMSException {
        synchronized (this) {
            if (this.executorService != null) {
                throw new JMSException("Removing message listeners after Connection start is not currently supported.");
            }
            this.messageListeners.remove(messageListener, kafkaMessageConsumer);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleException(Exception exc, String str) {
        synchronized (this) {
            if (this.exceptionListener != null) {
                JMSException jMSException = new JMSException(str);
                jMSException.initCause(exc.getCause());
                this.exceptionListener.onException(jMSException);
            } else {
                log.error(str, exc);
            }
        }
    }
}
