/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.jms;

import com.google.common.base.Supplier;
import com.google.common.collect.ForwardingSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimaps;
import com.google.common.collect.SetMultimap;
import com.google.common.collect.Sets;
import io.confluent.kafka.jms.ClientState;
import io.confluent.kafka.jms.ConfigurationJMSException;
import io.confluent.kafka.jms.ConsumerFactory;
import io.confluent.kafka.jms.ConsumerFactoryImpl;
import io.confluent.kafka.jms.JMSClientConfig;
import io.confluent.kafka.jms.KafkaConnectionMetaData;
import io.confluent.kafka.jms.KafkaMessageConsumer;
import io.confluent.kafka.jms.KafkaMessageProducer;
import io.confluent.kafka.jms.KafkaSession;
import io.confluent.kafka.jms.MessageListenerRunnable;
import io.confluent.kafka.jms.ProducerFactory;
import io.confluent.kafka.jms.ProducerFactoryImpl;
import io.confluent.kafka.jms.Unsupported;
import java.util.Collection;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
import javax.jms.ConnectionMetaData;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.MessageListener;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.Topic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class KafkaConnection
implements Connection {
    private static final Logger log = LoggerFactory.getLogger(KafkaConnection.class);
    final JMSClientConfig jmsClientConfig;
    final ClientState clientState = new ClientState();
    ExceptionListener exceptionListener;
    Map<MessageListener, MessageListenerRunnable> messageListenerRunnables;
    ExecutorService executorService;
    final SetMultimap<MessageListener, KafkaMessageConsumer> messageListeners = Multimaps.newSetMultimap((Map)Maps.newIdentityHashMap(), (Supplier)new Supplier<Set<KafkaMessageConsumer>>(){

        public Set<KafkaMessageConsumer> get() {
            return Sets.newIdentityHashSet();
        }
    });
    ProducerFactory producerFactory;
    ConsumerFactory consumerFactory;

    KafkaConnection(JMSClientConfig jmsClientConfig) {
        this.jmsClientConfig = jmsClientConfig;
        this.producerFactory = new ProducerFactoryImpl(this.jmsClientConfig.producer);
        this.consumerFactory = new ConsumerFactoryImpl(this.jmsClientConfig.consumer);
    }

    public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
        try {
            return new KafkaSession(this, transacted, acknowledgeMode);
        }
        catch (Exception ex) {
            throw new ConfigurationJMSException("Unable to create session", ex);
        }
    }

    public String getClientID() throws JMSException {
        return this.jmsClientConfig.clientID;
    }

    @Unsupported(value="Setting the client id is only supported by using `client.id` in the settings passed to KafkaConnectionFactory.")
    public void setClientID(String clientID) throws JMSException {
        throw new IllegalStateException("client.id cannot be configured by calling setClientID");
    }

    public ConnectionMetaData getMetaData() throws JMSException {
        return new KafkaConnectionMetaData();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ExceptionListener getExceptionListener() throws JMSException {
        KafkaConnection kafkaConnection = this;
        synchronized (kafkaConnection) {
            return this.exceptionListener;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setExceptionListener(ExceptionListener exceptionListener) throws JMSException {
        KafkaConnection kafkaConnection = this;
        synchronized (kafkaConnection) {
            this.exceptionListener = exceptionListener;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() throws JMSException {
        KafkaConnection kafkaConnection = this;
        synchronized (kafkaConnection) {
            if (null != this.executorService) {
                log.warn("Connection.start() has already been called.");
                return;
            }
            this.executorService = Executors.newCachedThreadPool();
            this.messageListenerRunnables = new IdentityHashMap<MessageListener, MessageListenerRunnable>();
            for (final MessageListener messageListener : this.messageListeners.keySet()) {
                ForwardingSet<KafkaMessageConsumer> forwardingSet = new ForwardingSet<KafkaMessageConsumer>(){

                    protected Set<KafkaMessageConsumer> delegate() {
                        return KafkaConnection.this.messageListeners.get((Object)messageListener);
                    }
                };
                log.trace("start() - Starting thread for {}", (Object)messageListener);
                MessageListenerRunnable runnable = new MessageListenerRunnable(this.jmsClientConfig, messageListener, (Collection<KafkaMessageConsumer>)forwardingSet);
                this.messageListenerRunnables.put(messageListener, runnable);
                this.executorService.submit(runnable);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() throws JMSException {
        KafkaConnection kafkaConnection = this;
        synchronized (kafkaConnection) {
            if (null == this.executorService) {
                log.warn("Connection.stop() has already been called or Connection.start() has never been called.");
                return;
            }
            this.executorService.shutdown();
            for (MessageListenerRunnable runnable : this.messageListenerRunnables.values()) {
                runnable.stop();
            }
            try {
                if (!this.executorService.awaitTermination(this.jmsClientConfig.connectionStopTimeoutMs, TimeUnit.MILLISECONDS)) {
                    log.trace("Timeout waiting for termination.");
                }
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            this.executorService = null;
        }
    }

    public void close() throws JMSException {
        this.stop();
        this.clientState.close();
    }

    void registerProducer(KafkaSession session, KafkaMessageProducer producer) {
        this.clientState.register(session, producer);
    }

    void registerConsumer(KafkaSession session, KafkaMessageConsumer consumer) {
        this.clientState.register(session, consumer);
    }

    void closeSessionClients(KafkaSession session) throws JMSException {
        this.clientState.close(session);
    }

    @Unsupported(value="This is an expert facility not used by regular JMS clients.")
    public ConnectionConsumer createConnectionConsumer(Destination destination, String s, ServerSessionPool serverSessionPool, int i) throws JMSException {
        throw new JMSException("This is an expert facility not used by regular JMS clients.");
    }

    @Unsupported(value="This is an expert facility not used by regular JMS clients.")
    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String s, String s1, ServerSessionPool serverSessionPool, int i) throws JMSException {
        throw new JMSException("This is an expert facility not used by regular JMS clients.");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void addMessageListener(MessageListener listener, KafkaMessageConsumer kafkaMessageConsumer) throws JMSException {
        KafkaConnection kafkaConnection = this;
        synchronized (kafkaConnection) {
            if (this.executorService != null) {
                throw new JMSException("Adding message listeners after Connection start is not currently supported.");
            }
            this.messageListeners.put((Object)listener, (Object)kafkaMessageConsumer);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void removeMessageListener(MessageListener listener, KafkaMessageConsumer kafkaMessageConsumer) throws JMSException {
        KafkaConnection kafkaConnection = this;
        synchronized (kafkaConnection) {
            if (this.executorService != null) {
                throw new JMSException("Removing message listeners after Connection start is not currently supported.");
            }
            this.messageListeners.remove((Object)listener, (Object)kafkaMessageConsumer);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void handleException(Exception e, String msg) {
        KafkaConnection kafkaConnection = this;
        synchronized (kafkaConnection) {
            if (this.exceptionListener != null) {
                JMSException exception = new JMSException(msg);
                exception.initCause(e.getCause());
                this.exceptionListener.onException(exception);
            } else {
                log.error(msg, (Throwable)e);
            }
        }
    }
}

