/*
 * Decompiled with CFR 0.152.
 */
package org.wso2.carbon.analytics.spark.event;

import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.analytics.spark.event.EventStreamDataStore;
import org.wso2.carbon.analytics.spark.event.internal.ServiceHolder;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.databridge.commons.Event;
import org.wso2.carbon.ntask.core.Task;

public class EventingTask
implements Task {
    private static final Log log = LogFactory.getLog(EventingTask.class);

    public void init() {
    }

    public void setProperties(Map<String, String> props) {
    }

    public void execute() {
        Map<Integer, List<Event>> eventBatches;
        do {
            eventBatches = EventStreamDataStore.extractNextEventBatch();
            this.processEventBatches(eventBatches);
        } while (eventBatches.size() > 0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processEventBatches(Map<Integer, List<Event>> eventBatches) {
        for (Map.Entry<Integer, List<Event>> entry : eventBatches.entrySet()) {
            try {
                PrivilegedCarbonContext.startTenantFlow();
                PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(entry.getKey().intValue());
                for (Event event : entry.getValue()) {
                    ServiceHolder.getEventStreamService().publish(event);
                }
                if (!log.isDebugEnabled()) continue;
                log.debug((Object)("Dispatched " + entry.getValue().size() + " events for tenant: " + entry.getKey()));
            }
            finally {
                PrivilegedCarbonContext.endTenantFlow();
            }
        }
    }
}

