/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.seda;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import org.apache.camel.AsyncEndpoint;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
import org.apache.camel.Category;
import org.apache.camel.Component;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.MultipleConsumersSupport;
import org.apache.camel.PollingConsumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.WaitForTaskToComplete;
import org.apache.camel.api.management.ManagedAttribute;
import org.apache.camel.api.management.ManagedOperation;
import org.apache.camel.api.management.ManagedResource;
import org.apache.camel.component.seda.BlockingQueueFactory;
import org.apache.camel.component.seda.LinkedBlockingQueueFactory;
import org.apache.camel.component.seda.QueueReference;
import org.apache.camel.component.seda.SedaComponent;
import org.apache.camel.component.seda.SedaConsumer;
import org.apache.camel.component.seda.SedaPollingConsumer;
import org.apache.camel.component.seda.SedaProducer;
import org.apache.camel.spi.BrowsableEndpoint;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriPath;
import org.apache.camel.support.DefaultEndpoint;
import org.apache.camel.support.PluginHelper;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.util.URISupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ManagedResource(description="Managed SedaEndpoint")
@UriEndpoint(firstVersion="1.1.0", scheme="seda", title="SEDA", syntax="seda:name", remote=false, category={Category.CORE, Category.MESSAGING})
public class SedaEndpoint
extends DefaultEndpoint
implements AsyncEndpoint,
BrowsableEndpoint,
MultipleConsumersSupport {
    private static final Logger LOG = LoggerFactory.getLogger(SedaEndpoint.class);
    private final Set<SedaProducer> producers = new CopyOnWriteArraySet<SedaProducer>();
    private final Set<SedaConsumer> consumers = new CopyOnWriteArraySet<SedaConsumer>();
    private volatile AsyncProcessor consumerMulticastProcessor;
    private volatile boolean multicastStarted;
    private volatile ExecutorService multicastExecutor;
    @UriPath(description="Name of queue")
    @Metadata(required=true)
    private String name;
    @UriParam(label="advanced", description="Define the queue instance which will be used by the endpoint")
    private BlockingQueue<Exchange> queue;
    @UriParam(defaultValue="1000", description="The maximum capacity of the SEDA queue (i.e., the number of messages it can hold). Will by default use the queueSize set on the SEDA component.")
    private int size = 1000;
    @UriParam(label="advanced", defaultValue="100", description="Maximum number of messages to keep in memory available for browsing. Use 0 for unlimited.")
    private int browseLimit = 100;
    @UriParam(label="consumer", defaultValue="1", description="Number of concurrent threads processing exchanges.")
    private int concurrentConsumers = 1;
    @UriParam(label="consumer,advanced", defaultValue="true", description="Whether to limit the number of concurrentConsumers to the maximum of 500. By default, an exception will be thrown if an endpoint is configured with a greater number. You can disable that check by turning this option off.")
    private boolean limitConcurrentConsumers = true;
    @UriParam(label="consumer,advanced", description="Specifies whether multiple consumers are allowed. If enabled, you can use SEDA for Publish-Subscribe messaging. That is, you can send a message to the SEDA queue and have each consumer receive a copy of the message. When enabled, this option should be specified on every consumer endpoint.")
    private boolean multipleConsumers;
    @UriParam(label="consumer,advanced", description="Whether to purge the task queue when stopping the consumer/route. This allows to stop faster, as any pending messages on the queue is discarded.")
    private boolean purgeWhenStopping;
    @UriParam(label="consumer,advanced", defaultValue="1000", description="The timeout (in milliseconds) used when polling. When a timeout occurs, the consumer can check whether it is allowed to continue running. Setting a lower value allows the consumer to react more quickly upon shutdown.")
    private int pollTimeout = 1000;
    @UriParam(label="producer", defaultValue="IfReplyExpected", description="Option to specify whether the caller should wait for the async task to complete or not before continuing. The following three options are supported: Always, Never or IfReplyExpected. The first two values are self-explanatory. The last value, IfReplyExpected, will only wait if the message is Request Reply based. The default option is IfReplyExpected.")
    private WaitForTaskToComplete waitForTaskToComplete = WaitForTaskToComplete.IfReplyExpected;
    @UriParam(label="producer", defaultValue="30000", javaType="java.time.Duration", description="Timeout before a SEDA producer will stop waiting for an asynchronous task to complete. You can disable timeout by using 0 or a negative value.")
    private long timeout = 30000L;
    @UriParam(label="producer,advanced", javaType="java.time.Duration", description="Offer timeout can be added to the block case when queue is full. You can disable timeout by using 0 or a negative value.")
    private long offerTimeout;
    @UriParam(label="producer,advanced", description="Whether a thread that sends messages to a full SEDA queue will block until the queue's capacity is no longer exhausted. By default, an exception will be thrown stating that the queue is full. By enabling this option, the calling thread will instead block and wait until the message can be accepted.")
    private boolean blockWhenFull;
    @UriParam(label="producer,advanced", description="Whether a thread that sends messages to a full SEDA queue will be discarded. By default, an exception will be thrown stating that the queue is full. By enabling this option, the calling thread will give up sending and continue, meaning that the message was not sent to the SEDA queue.")
    private boolean discardWhenFull;
    @UriParam(label="producer,advanced", description="Whether the producer should fail by throwing an exception, when sending to a queue with no active consumers. Only one of the options discardIfNoConsumers and failIfNoConsumers can be enabled at the same time.")
    private boolean failIfNoConsumers;
    @UriParam(label="producer,advanced", description="Whether the producer should discard the message (do not add the message to the queue), when sending to a queue with no active consumers. Only one of the options discardIfNoConsumers and failIfNoConsumers can be enabled at the same time.")
    private boolean discardIfNoConsumers;
    private BlockingQueueFactory<Exchange> queueFactory;
    private volatile QueueReference ref;

    public SedaEndpoint() {
        this.queueFactory = new LinkedBlockingQueueFactory<Exchange>();
    }

    public SedaEndpoint(String endpointUri, Component component, BlockingQueue<Exchange> queue) {
        this(endpointUri, component, queue, 1);
    }

    public SedaEndpoint(String endpointUri, Component component, BlockingQueue<Exchange> queue, int concurrentConsumers) {
        this(endpointUri, component, concurrentConsumers);
        this.queue = queue;
        if (queue != null) {
            this.size = queue.remainingCapacity();
        }
        this.queueFactory = new LinkedBlockingQueueFactory<Exchange>();
        this.getComponent().registerQueue(this, queue);
    }

    public SedaEndpoint(String endpointUri, Component component, BlockingQueueFactory<Exchange> queueFactory, int concurrentConsumers) {
        this(endpointUri, component, concurrentConsumers);
        this.queueFactory = queueFactory;
    }

    private SedaEndpoint(String endpointUri, Component component, int concurrentConsumers) {
        super(endpointUri, component);
        this.concurrentConsumers = concurrentConsumers;
    }

    public boolean isRemote() {
        return false;
    }

    public SedaComponent getComponent() {
        return (SedaComponent)super.getComponent();
    }

    public Producer createProducer() throws Exception {
        return new SedaProducer(this, this.getWaitForTaskToComplete(), this.getTimeout(), this.isBlockWhenFull(), this.isDiscardWhenFull(), this.getOfferTimeout());
    }

    public Consumer createConsumer(Processor processor) throws Exception {
        if (this.getComponent() != null) {
            String key = this.getComponent().getQueueKey(this.getEndpointUri());
            QueueReference ref = this.getComponent().getQueueReference(key);
            if (ref != null && ref.getMultipleConsumers().booleanValue() != this.isMultipleConsumers()) {
                throw new IllegalArgumentException("Cannot use existing queue " + key + " as the existing queue multiple consumers " + ref.getMultipleConsumers() + " does not match given multiple consumers " + this.multipleConsumers);
            }
        }
        SedaConsumer answer = this.createNewConsumer(processor);
        this.configureConsumer((Consumer)answer);
        return answer;
    }

    protected SedaConsumer createNewConsumer(Processor processor) {
        return new SedaConsumer(this, processor);
    }

    public PollingConsumer createPollingConsumer() throws Exception {
        SedaPollingConsumer answer = new SedaPollingConsumer((Endpoint)this);
        this.configureConsumer((Consumer)answer);
        return answer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public BlockingQueue<Exchange> getQueue() {
        this.lock.lock();
        try {
            if (this.queue == null) {
                if (this.getComponent() != null) {
                    Integer size = this.getSize() == Integer.MAX_VALUE || this.getSize() == 1000 ? null : Integer.valueOf(this.getSize());
                    QueueReference ref = this.getComponent().getOrCreateQueue(this, size, this.isMultipleConsumers(), this.queueFactory);
                    this.queue = ref.getQueue();
                    String key = this.getComponent().getQueueKey(this.getEndpointUri());
                    LOG.debug("Endpoint {} is using shared queue: {} with size: {}", new Object[]{this, key, ref.getSize() != null ? ref.getSize() : Integer.MAX_VALUE});
                    if (ref.getSize() != null) {
                        this.setSize(ref.getSize());
                    }
                } else {
                    this.queue = this.createQueue();
                    LOG.debug("Endpoint {} is using queue: {} with size: {}", new Object[]{this, this.getEndpointUri(), this.getSize()});
                }
            }
            BlockingQueue<Exchange> blockingQueue = this.queue;
            return blockingQueue;
        }
        finally {
            this.lock.unlock();
        }
    }

    protected BlockingQueue<Exchange> createQueue() {
        if (this.size > 0) {
            return this.queueFactory.create(this.size);
        }
        return this.queueFactory.create();
    }

    public QueueReference getQueueReference() {
        if (this.ref == null || this.ref.getCount() == 0) {
            this.ref = this.tryQueueRefInit();
        }
        return this.ref;
    }

    private QueueReference tryQueueRefInit() {
        SedaComponent component = this.getComponent();
        if (component != null) {
            String key = component.getQueueKey(this.getEndpointUri());
            return component.getQueueReference(key);
        }
        return null;
    }

    protected AsyncProcessor getConsumerMulticastProcessor() {
        this.lock.lock();
        try {
            if (!this.multicastStarted && this.consumerMulticastProcessor != null) {
                ServiceHelper.startService((Object)this.consumerMulticastProcessor);
                this.multicastStarted = true;
            }
            AsyncProcessor asyncProcessor = this.consumerMulticastProcessor;
            return asyncProcessor;
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void updateMulticastProcessor() throws Exception {
        this.lock.lock();
        try {
            int size;
            if (!this.isMultipleConsumersSupported()) {
                return;
            }
            if (this.consumerMulticastProcessor != null) {
                ServiceHelper.stopService((Object)this.consumerMulticastProcessor);
                this.consumerMulticastProcessor = null;
            }
            if ((size = this.getConsumers().size()) >= 1) {
                if (this.multicastExecutor == null) {
                    this.multicastExecutor = this.getCamelContext().getExecutorServiceManager().newDefaultThreadPool((Object)this, URISupport.sanitizeUri((String)this.getEndpointUri()) + "(multicast)");
                }
                ArrayList<Processor> processors = new ArrayList<Processor>(size);
                for (SedaConsumer consumer : this.getConsumers()) {
                    processors.add(consumer.getProcessor());
                }
                this.multicastStarted = false;
                this.consumerMulticastProcessor = (AsyncProcessor)PluginHelper.getProcessorFactory((CamelContext)this.getCamelContext()).createProcessor(this.getCamelContext(), "MulticastProcessor", new Object[]{processors, this.multicastExecutor, false});
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    void setName(String name) {
        this.name = name;
    }

    @ManagedAttribute(description="Queue name")
    public String getName() {
        return this.name;
    }

    public void setQueue(BlockingQueue<Exchange> queue) {
        this.queue = queue;
        this.size = queue.remainingCapacity();
    }

    @ManagedAttribute(description="Queue max capacity")
    public int getSize() {
        return this.size;
    }

    public void setSize(int size) {
        this.size = size;
    }

    @ManagedAttribute(description="Current queue size")
    public int getCurrentQueueSize() {
        return this.queue.size();
    }

    @ManagedAttribute(description="Maximum number of messages to browse by default")
    public int getBrowseLimit() {
        return this.browseLimit;
    }

    public void setBrowseLimit(int browseLimit) {
        this.browseLimit = browseLimit;
    }

    public void setBlockWhenFull(boolean blockWhenFull) {
        this.blockWhenFull = blockWhenFull;
    }

    @ManagedAttribute(description="Whether the caller will block sending to a full queue")
    public boolean isBlockWhenFull() {
        return this.blockWhenFull;
    }

    public void setDiscardWhenFull(boolean discardWhenFull) {
        this.discardWhenFull = discardWhenFull;
    }

    @ManagedAttribute(description="Whether the caller will discard sending to a full queue")
    public boolean isDiscardWhenFull() {
        return this.discardWhenFull;
    }

    public void setConcurrentConsumers(int concurrentConsumers) {
        this.concurrentConsumers = concurrentConsumers;
    }

    @ManagedAttribute(description="Number of concurrent consumers")
    public int getConcurrentConsumers() {
        return this.concurrentConsumers;
    }

    @ManagedAttribute
    public boolean isLimitConcurrentConsumers() {
        return this.limitConcurrentConsumers;
    }

    public void setLimitConcurrentConsumers(boolean limitConcurrentConsumers) {
        this.limitConcurrentConsumers = limitConcurrentConsumers;
    }

    public WaitForTaskToComplete getWaitForTaskToComplete() {
        return this.waitForTaskToComplete;
    }

    public void setWaitForTaskToComplete(WaitForTaskToComplete waitForTaskToComplete) {
        this.waitForTaskToComplete = waitForTaskToComplete;
    }

    @ManagedAttribute
    public long getTimeout() {
        return this.timeout;
    }

    public void setTimeout(long timeout) {
        this.timeout = timeout;
    }

    @ManagedAttribute
    public long getOfferTimeout() {
        return this.offerTimeout;
    }

    public void setOfferTimeout(long offerTimeout) {
        this.offerTimeout = offerTimeout;
    }

    @ManagedAttribute
    public boolean isFailIfNoConsumers() {
        return this.failIfNoConsumers;
    }

    public void setFailIfNoConsumers(boolean failIfNoConsumers) {
        this.failIfNoConsumers = failIfNoConsumers;
    }

    @ManagedAttribute
    public boolean isDiscardIfNoConsumers() {
        return this.discardIfNoConsumers;
    }

    public void setDiscardIfNoConsumers(boolean discardIfNoConsumers) {
        this.discardIfNoConsumers = discardIfNoConsumers;
    }

    @ManagedAttribute
    public boolean isMultipleConsumers() {
        return this.multipleConsumers;
    }

    public void setMultipleConsumers(boolean multipleConsumers) {
        this.multipleConsumers = multipleConsumers;
    }

    @ManagedAttribute
    public int getPollTimeout() {
        return this.pollTimeout;
    }

    public void setPollTimeout(int pollTimeout) {
        this.pollTimeout = pollTimeout;
    }

    @ManagedAttribute
    public boolean isPurgeWhenStopping() {
        return this.purgeWhenStopping;
    }

    public void setPurgeWhenStopping(boolean purgeWhenStopping) {
        this.purgeWhenStopping = purgeWhenStopping;
    }

    public List<Exchange> getExchanges() {
        return new ArrayList<Exchange>(this.getQueue());
    }

    @ManagedAttribute
    public boolean isMultipleConsumersSupported() {
        return this.isMultipleConsumers();
    }

    @ManagedOperation(description="Purges the seda queue")
    public void purgeQueue() {
        LOG.debug("Purging queue with {} exchanges", (Object)this.queue.size());
        this.queue.clear();
    }

    public Set<SedaConsumer> getConsumers() {
        return this.consumers;
    }

    public Set<SedaProducer> getProducers() {
        return new HashSet<SedaProducer>(this.producers);
    }

    void onStarted(SedaProducer producer) {
        this.producers.add(producer);
    }

    void onStopped(SedaProducer producer) {
        this.producers.remove((Object)producer);
    }

    void onStarted(SedaConsumer consumer) throws Exception {
        this.consumers.add(consumer);
        if (this.isMultipleConsumers()) {
            this.updateMulticastProcessor();
        }
    }

    void onStopped(SedaConsumer consumer) throws Exception {
        this.consumers.remove(consumer);
        if (this.isMultipleConsumers()) {
            this.updateMulticastProcessor();
        }
    }

    public boolean hasConsumers() {
        return !this.consumers.isEmpty();
    }

    protected void doInit() throws Exception {
        super.doInit();
        if (this.discardWhenFull && this.blockWhenFull) {
            throw new IllegalArgumentException("Cannot enable both discardWhenFull=true and blockWhenFull=true. You can only either discard or block when full.");
        }
    }

    protected void doStart() throws Exception {
        super.doStart();
        if (this.queue == null) {
            this.queue = this.getQueue();
        }
        this.ref = this.tryQueueRefInit();
    }

    public void stop() {
        if (this.getConsumers().isEmpty()) {
            super.stop();
        } else {
            LOG.debug("There is still active consumers.");
        }
        this.ref = null;
    }

    public void shutdown() {
        if (this.isShutdown()) {
            LOG.trace("Service already shut down");
            return;
        }
        if (this.getComponent() != null) {
            this.getComponent().onShutdownEndpoint(this);
        }
        if (this.getConsumers().isEmpty()) {
            super.shutdown();
        } else {
            LOG.debug("There is still active consumers.");
        }
    }

    protected void doShutdown() throws Exception {
        if (this.multicastExecutor != null) {
            this.getCamelContext().getExecutorServiceManager().shutdownNow(this.multicastExecutor);
            this.multicastExecutor = null;
        }
        this.queue = null;
        this.ref = null;
    }
}

