package org.mule.routing;

import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentMap;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import javax.resource.spi.work.Work;
import javax.resource.spi.work.WorkException;
import org.apache.commons.collections.buffer.BoundedFifoBuffer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.mule.api.MuleContext;
import org.mule.api.MuleEvent;
import org.mule.api.MuleMessage;
import org.mule.api.routing.MessageInfoMapping;
import org.mule.api.routing.ResponseTimeoutException;
import org.mule.api.routing.RoutingException;
import org.mule.config.i18n.CoreMessages;
import org.mule.context.notification.RoutingNotification;
import org.mule.routing.inbound.EventGroup;
import org.mule.util.MapUtils;
import org.mule.util.concurrent.Latch;

/* loaded from: input_file:org/mule/routing/EventCorrelator.class */
public class EventCorrelator {
    public static final String NO_CORRELATION_ID = "no-id";
    public static final int MAX_PROCESSED_GROUPS = 50000;
    private MessageInfoMapping messageInfoMapping;
    private MuleContext context;
    private EventCorrelatorCallback callback;
    protected final transient Log logger = LogFactory.getLog(EventCorrelator.class);
    protected final ConcurrentMap eventGroups = new ConcurrentHashMap();
    protected final ConcurrentMap locks = new ConcurrentHashMap();
    protected final ConcurrentMap responseMessages = new ConcurrentHashMap();
    protected final BoundedFifoBuffer processedGroups = new BoundedFifoBuffer(MAX_PROCESSED_GROUPS);
    private int timeout = -1;
    private boolean failOnTimeout = true;
    private AtomicBoolean timerStarted = new AtomicBoolean(false);

    public EventCorrelator(EventCorrelatorCallback eventCorrelatorCallback, MessageInfoMapping messageInfoMapping, MuleContext muleContext) {
        if (eventCorrelatorCallback == null) {
            throw new IllegalArgumentException(CoreMessages.objectIsNull("EventCorrelatorCallback").getMessage());
        }
        if (messageInfoMapping == null) {
            throw new IllegalArgumentException(CoreMessages.objectIsNull("MessageInfoMapping").getMessage());
        }
        if (muleContext == null) {
            throw new IllegalArgumentException(CoreMessages.objectIsNull("MuleContext").getMessage());
        }
        this.callback = eventCorrelatorCallback;
        this.messageInfoMapping = messageInfoMapping;
        this.context = muleContext;
    }

    public void enableTimeoutMonitor() throws WorkException {
        if (this.timerStarted.get()) {
            return;
        }
        this.context.getWorkManager().scheduleWork(new Work() { // from class: org.mule.routing.EventCorrelator.1
            public void release() {
            }

            public void run() {
                while (true) {
                    ArrayList<EventGroup> arrayList = new ArrayList(1);
                    for (EventGroup eventGroup : EventCorrelator.this.eventGroups.values()) {
                        if (eventGroup.getCreated() + EventCorrelator.this.getTimeout() < System.currentTimeMillis()) {
                            arrayList.add(eventGroup);
                        }
                    }
                    if (arrayList.size() > 0) {
                        for (EventGroup eventGroup2 : arrayList) {
                            EventCorrelator.this.eventGroups.remove(eventGroup2.getGroupId());
                            EventCorrelator.this.locks.remove(eventGroup2.getGroupId());
                            EventCorrelator.this.context.fireNotification(new RoutingNotification(eventGroup2.toMessageCollection(), null, RoutingNotification.CORRELATION_TIMEOUT));
                            eventGroup2.toArray()[0].getService().getExceptionListener().exceptionThrown(new CorrelationTimeoutException(CoreMessages.correlationTimedOut(eventGroup2.getGroupId()), (MuleMessage) eventGroup2.toMessageCollection()));
                        }
                    }
                    try {
                        Thread.sleep(100L);
                    } catch (InterruptedException e) {
                        return;
                    }
                }
            }
        });
    }

    public Map getResponseMessages() {
        return Collections.unmodifiableMap(this.responseMessages);
    }

    public MuleMessage process(MuleEvent muleEvent) throws RoutingException {
        addEvent(muleEvent);
        String correlationId = this.messageInfoMapping.getCorrelationId(muleEvent.getMessage());
        if (this.locks.get(correlationId) == null) {
            return null;
        }
        this.locks.remove(correlationId);
        return (MuleMessage) this.responseMessages.remove(correlationId);
    }

    public void addEvent(MuleEvent muleEvent) throws RoutingException {
        String correlationId = this.messageInfoMapping.getCorrelationId(muleEvent.getMessage());
        if (correlationId == null || correlationId.equals("-1")) {
            throw new RoutingException(CoreMessages.noCorrelationId(), muleEvent.getMessage(), muleEvent.getEndpoint());
        }
        boolean z = false;
        while (true) {
            if (z) {
                try {
                    Thread.sleep(1L);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            if (isGroupAlreadyProcessed(correlationId)) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("An event was received for an event group that has already been processed, this is probably because the async-reply timed out. Correlation Id is: " + ((Object) correlationId) + ". Dropping event");
                }
                this.context.fireNotification(new RoutingNotification(muleEvent.getMessage(), muleEvent.getEndpoint().getEndpointURI().toString(), RoutingNotification.MISSED_ASYNC_REPLY));
                return;
            }
            EventGroup eventGroup = getEventGroup(correlationId);
            if (eventGroup == null) {
                eventGroup = addEventGroup(this.callback.createEventGroup(muleEvent, correlationId));
            }
            synchronized (eventGroup) {
                if (eventGroup == getEventGroup(correlationId)) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Adding event to response aggregator group: " + ((Object) correlationId));
                    }
                    eventGroup.addEvent(muleEvent);
                    if (this.callback.shouldAggregateEvents(eventGroup)) {
                        MuleMessage aggregateEvents = this.callback.aggregateEvents(eventGroup);
                        removeEventGroup(eventGroup);
                        if (((MuleMessage) this.responseMessages.putIfAbsent(correlationId, aggregateEvents)) != null) {
                            throw new IllegalStateException("Detected duplicate aggregation result message with id: " + ((Object) correlationId));
                        }
                        Latch latch = (Latch) this.locks.get(correlationId);
                        if (latch == null) {
                            if (this.logger.isDebugEnabled()) {
                                this.logger.debug("Creating latch for " + ((Object) correlationId) + " in " + this);
                            }
                            latch = new Latch();
                            Latch latch2 = (Latch) this.locks.putIfAbsent(correlationId, latch);
                            if (latch2 != null) {
                                latch = latch2;
                            }
                        }
                        latch.countDown();
                    }
                    return;
                }
                z = true;
            }
        }
    }

    protected EventGroup getEventGroup(Object obj) {
        return (EventGroup) this.eventGroups.get(obj);
    }

    protected EventGroup addEventGroup(EventGroup eventGroup) {
        EventGroup eventGroup2 = (EventGroup) this.eventGroups.putIfAbsent(eventGroup.getGroupId(), eventGroup);
        return eventGroup2 != null ? eventGroup2 : eventGroup;
    }

    protected void removeEventGroup(EventGroup eventGroup) {
        this.eventGroups.remove(eventGroup.getGroupId());
        addProcessedGroup(eventGroup.getGroupId());
    }

    protected void addProcessedGroup(Object obj) {
        if (this.processedGroups.isFull()) {
            this.processedGroups.remove();
        }
        this.processedGroups.add(obj);
    }

    protected boolean isGroupAlreadyProcessed(Object obj) {
        return this.processedGroups.contains(obj);
    }

    public MuleMessage getResponse(MuleMessage muleMessage) throws RoutingException {
        return getResponse(muleMessage, getTimeout());
    }

    public MuleMessage getResponse(MuleMessage muleMessage, int i) throws RoutingException {
        MuleMessage muleMessage2;
        String messageId = this.messageInfoMapping.getMessageId(muleMessage);
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Waiting for response for message id: " + ((Object) messageId) + " in " + this);
        }
        Latch latch = (Latch) this.locks.get(messageId);
        if (latch == null) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Got response but no one is waiting for it yet. Creating latch for " + ((Object) messageId) + " in " + this);
            }
            latch = new Latch();
            Latch latch2 = (Latch) this.locks.putIfAbsent(messageId, latch);
            if (latch2 != null) {
                latch = latch2;
            }
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Got latch for message: " + ((Object) messageId));
        }
        boolean z = false;
        try {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Waiting for response to message: " + ((Object) messageId));
            }
            if (getTimeout() <= 0) {
                latch.await();
                z = true;
            } else {
                z = latch.await(i, TimeUnit.MILLISECONDS);
            }
            this.locks.remove(messageId);
            muleMessage2 = (MuleMessage) this.responseMessages.remove(messageId);
            if (0 != 0) {
                Thread.currentThread().interrupt();
            }
        } catch (InterruptedException e) {
            this.locks.remove(messageId);
            muleMessage2 = (MuleMessage) this.responseMessages.remove(messageId);
            if (1 != 0) {
                Thread.currentThread().interrupt();
            }
        } catch (Throwable th) {
            this.locks.remove(messageId);
            if (0 != 0) {
                Thread.currentThread().interrupt();
            }
            throw th;
        }
        if (z) {
            if (muleMessage2 == null) {
                throw new IllegalStateException("Response Message is null");
            }
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("remaining locks  : " + this.locks.keySet());
                this.logger.debug("remaining results: " + this.responseMessages.keySet());
            }
            return muleMessage2;
        }
        if (isFailOnTimeout()) {
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Current responses are: \n" + MapUtils.toString(this.responseMessages, true));
            }
            this.context.fireNotification(new RoutingNotification(muleMessage, null, RoutingNotification.ASYNC_REPLY_TIMEOUT));
            throw new ResponseTimeoutException(CoreMessages.responseTimedOutWaitingForId(getTimeout(), messageId), muleMessage, null);
        }
        EventGroup eventGroup = getEventGroup(messageId);
        if (eventGroup != null) {
            removeEventGroup(eventGroup);
            return this.callback.aggregateEvents(eventGroup);
        }
        if (!this.logger.isTraceEnabled()) {
            return null;
        }
        this.logger.trace("There is no current event Group. Current responses are: \n" + MapUtils.toString(this.responseMessages, true));
        return null;
    }

    public boolean isFailOnTimeout() {
        return this.failOnTimeout;
    }

    public void setFailOnTimeout(boolean z) {
        this.failOnTimeout = z;
    }

    public int getTimeout() {
        return this.timeout;
    }

    public void setTimeout(int i) {
        this.timeout = i;
    }
}
