/*
 * Decompiled with CFR 0.152.
 */
package ca.uhn.fhir.jpa.subscription.channel.impl;

import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelSettings;
import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannel;
import ca.uhn.fhir.jpa.subscription.channel.subscription.IChannelNamer;
import ca.uhn.fhir.util.StopWatch;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.PreDestroy;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LinkedBlockingChannelFactory
implements IChannelFactory {
    private static final Logger ourLog = LoggerFactory.getLogger(LinkedBlockingChannelFactory.class);
    private final IChannelNamer myChannelNamer;
    private final Map<String, LinkedBlockingChannel> myChannels = Collections.synchronizedMap(new HashMap());

    public LinkedBlockingChannelFactory(IChannelNamer theChannelNamer) {
        this.myChannelNamer = theChannelNamer;
    }

    @Override
    public IChannelReceiver getOrCreateReceiver(String theChannelName, Class<?> theMessageType, ChannelConsumerSettings theChannelSettings) {
        return this.getOrCreateChannel(theChannelName, theChannelSettings.getConcurrentConsumers(), theChannelSettings);
    }

    @Override
    public IChannelProducer getOrCreateProducer(String theChannelName, Class<?> theMessageType, ChannelProducerSettings theChannelSettings) {
        return this.getOrCreateChannel(theChannelName, theChannelSettings.getConcurrentConsumers(), theChannelSettings);
    }

    @Override
    public IChannelNamer getChannelNamer() {
        return this.myChannelNamer;
    }

    private LinkedBlockingChannel getOrCreateChannel(String theChannelName, int theConcurrentConsumers, IChannelSettings theChannelSettings) {
        String channelName = this.myChannelNamer.getChannelName(theChannelName, theChannelSettings);
        return this.myChannels.computeIfAbsent(channelName, t -> this.buildLinkedBlockingChannel(theConcurrentConsumers, channelName));
    }

    @Nonnull
    private LinkedBlockingChannel buildLinkedBlockingChannel(int theConcurrentConsumers, String channelName) {
        String threadNamingPattern = channelName + "-%d";
        BasicThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern(threadNamingPattern).uncaughtExceptionHandler(this.uncaughtExceptionHandler(channelName)).daemon(false).priority(5).build();
        LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(1000);
        RejectedExecutionHandler rejectedExecutionHandler = (theRunnable, theExecutor) -> {
            ourLog.info("Note: Executor queue is full ({} elements), waiting for a slot to become available!", (Object)queue.size());
            StopWatch sw = new StopWatch();
            try {
                queue.put(theRunnable);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RejectedExecutionException(Msg.code((int)568) + "Task " + theRunnable.toString() + " rejected from " + e);
            }
            ourLog.info("Slot become available after {}ms", (Object)sw.getMillis());
        };
        ThreadPoolExecutor executor = new ThreadPoolExecutor(theConcurrentConsumers, theConcurrentConsumers, 0L, TimeUnit.MILLISECONDS, queue, (ThreadFactory)threadFactory, rejectedExecutionHandler);
        LinkedBlockingChannel retval = new LinkedBlockingChannel(channelName, executor, queue);
        return retval;
    }

    private Thread.UncaughtExceptionHandler uncaughtExceptionHandler(final String theChannelName) {
        return new Thread.UncaughtExceptionHandler(){

            @Override
            public void uncaughtException(Thread t, Throwable e) {
                ourLog.error("Failure handling message in channel {}", (Object)theChannelName, (Object)e);
            }
        };
    }

    @PreDestroy
    public void stop() {
        this.myChannels.clear();
    }
}

