/*
 * Decompiled with CFR 0.152.
 */
package org.wso2.carbon.analytics.eventsink.subscriber;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.analytics.eventsink.AnalyticsEventStore;
import org.wso2.carbon.analytics.eventsink.exception.AnalyticsEventStoreException;
import org.wso2.carbon.analytics.eventsink.internal.AnalyticsEventStoreManager;
import org.wso2.carbon.analytics.eventsink.internal.util.AnalyticsEventSinkUtil;
import org.wso2.carbon.analytics.eventsink.internal.util.ServiceHolder;
import org.wso2.carbon.analytics.eventsink.subscriber.AnalyticsWSO2EventConsumer;
import org.wso2.carbon.databridge.commons.utils.DataBridgeCommonsUtils;
import org.wso2.carbon.event.stream.core.EventStreamListener;
import org.wso2.carbon.event.stream.core.WSO2EventConsumer;
import org.wso2.carbon.event.stream.core.exception.EventStreamConfigurationException;

public class AnalyticsEventStreamListener
implements EventStreamListener {
    private static final Log log = LogFactory.getLog(AnalyticsEventStreamListener.class);
    private static final ConcurrentHashMap<Integer, List<AnalyticsWSO2EventConsumer>> consumerCache = new ConcurrentHashMap();

    public void removedEventStream(int tenantId, String streamName, String version) {
        List<AnalyticsWSO2EventConsumer> tenantConsumers = consumerCache.get(tenantId);
        if (tenantConsumers != null) {
            String streamId = DataBridgeCommonsUtils.generateStreamId((String)streamName, (String)version);
            for (AnalyticsWSO2EventConsumer consumer : tenantConsumers) {
                if (!consumer.getStreamId().equals(streamId)) continue;
                try {
                    ServiceHolder.getAnalyticsEventSinkService().removeEventSink(tenantId, streamName, version);
                }
                catch (AnalyticsEventStoreException e) {
                    log.error((Object)("Error while removing analytics event store configuration for stream Id :" + streamId), (Throwable)e);
                }
                break;
            }
        }
    }

    public void addedEventStream(int tenantId, String streamName, String version) {
        AnalyticsEventStore analyticsEventStore = AnalyticsEventStoreManager.getInstance().getAnalyticsEventStore(tenantId, AnalyticsEventSinkUtil.generateAnalyticsTableName(streamName));
        if (analyticsEventStore != null && analyticsEventStore.getEventSource().contains(DataBridgeCommonsUtils.generateStreamId((String)streamName, (String)version))) {
            this.subscribeForStream(tenantId, DataBridgeCommonsUtils.generateStreamId((String)streamName, (String)version));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void subscribeForStream(int tenantId, String streamId) {
        AnalyticsWSO2EventConsumer analyticsWSO2EventConsumer;
        List<AnalyticsWSO2EventConsumer> consumers = consumerCache.get(tenantId);
        if (consumers == null) {
            AnalyticsEventStreamListener analyticsEventStreamListener = this;
            synchronized (analyticsEventStreamListener) {
                consumers = consumerCache.get(tenantId);
                if (consumers == null) {
                    consumers = new ArrayList<AnalyticsWSO2EventConsumer>();
                    consumerCache.put(tenantId, consumers);
                }
            }
        }
        if (!consumers.contains(analyticsWSO2EventConsumer = new AnalyticsWSO2EventConsumer(streamId, tenantId))) {
            try {
                ServiceHolder.getEventStreamService().subscribe((WSO2EventConsumer)analyticsWSO2EventConsumer);
                consumers.add(analyticsWSO2EventConsumer);
            }
            catch (EventStreamConfigurationException e) {
                log.error((Object)("Error while registering subscriber for stream id " + streamId + " for tenant id " + tenantId + ". " + e.getMessage()), (Throwable)e);
            }
        }
    }

    public void unsubscribeFromStream(int tenantId, String streamId) {
        AnalyticsWSO2EventConsumer analyticsWSO2EventConsumer;
        int index;
        List<AnalyticsWSO2EventConsumer> consumers = consumerCache.get(tenantId);
        if (consumers != null && (index = consumers.indexOf(analyticsWSO2EventConsumer = new AnalyticsWSO2EventConsumer(streamId, tenantId))) != -1) {
            AnalyticsWSO2EventConsumer consumer = consumers.remove(index);
            ServiceHolder.getEventStreamService().unsubscribe((WSO2EventConsumer)consumer);
        }
    }
}

