/*
 * Decompiled with CFR 0.152.
 */
package org.wso2.carbon.analytics.spark.event;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.analytics.dataservice.commons.AnalyticsDataResponse;
import org.wso2.carbon.analytics.dataservice.core.AnalyticsDataService;
import org.wso2.carbon.analytics.dataservice.core.AnalyticsDataServiceUtils;
import org.wso2.carbon.analytics.dataservice.core.AnalyticsServiceHolder;
import org.wso2.carbon.analytics.datasource.commons.Record;
import org.wso2.carbon.analytics.datasource.commons.exception.AnalyticsException;
import org.wso2.carbon.analytics.spark.event.EventRecord;
import org.wso2.carbon.databridge.commons.Event;

public class EventStreamDataStore {
    private static final Log log = LogFactory.getLog(EventStreamDataStore.class);
    private static final String EVENT_STREAM_DATA_STORE_TABLE = "__EVENT_STREAM_DATA_STORE__";
    private static final int META_TABLE_TID = -1000;

    private EventStreamDataStore() {
    }

    public static void initStore() throws AnalyticsException {
        AnalyticsServiceHolder.getAnalyticsDataService().createTable(-1000, EVENT_STREAM_DATA_STORE_TABLE);
    }

    public static void addToStore(List<EventRecord> eventRecordList) throws AnalyticsException {
        AnalyticsDataService ads = AnalyticsServiceHolder.getAnalyticsDataService();
        ArrayList<Record> records = new ArrayList<Record>(eventRecordList.size());
        for (EventRecord eventRecord : eventRecordList) {
            records.add(new Record(-1000, EVENT_STREAM_DATA_STORE_TABLE, EventStreamDataStore.createValues(eventRecord)));
        }
        if (records.size() > 0) {
            ads.put(records);
        }
    }

    public static Map<Integer, List<Event>> extractNextEventBatch() {
        AnalyticsDataService ads = AnalyticsServiceHolder.getAnalyticsDataService();
        try {
            AnalyticsDataResponse resp = ads.get(-1000, EVENT_STREAM_DATA_STORE_TABLE, 1, null, Long.MIN_VALUE, Long.MAX_VALUE, 0, 1000);
            List records = AnalyticsDataServiceUtils.listRecords((AnalyticsDataService)ads, (AnalyticsDataResponse)resp);
            HashMap<Integer, List<Event>> result = new HashMap<Integer, List<Event>>();
            ArrayList<String> ids = new ArrayList<String>(records.size());
            for (Record record : records) {
                Integer tenantId = (Integer)record.getValue("tenantId");
                String streamId = (String)record.getValue("streamId");
                List payload = (List)record.getValue("payload");
                List metaData = (List)record.getValue("meta");
                List correlationData = (List)record.getValue("correlation");
                if (tenantId == null || streamId == null || payload == null) {
                    log.warn((Object)("Corrupted Spark eventing store record (ignoring): " + record));
                } else {
                    ArrayList<Event> tlist = (ArrayList<Event>)result.get(tenantId);
                    if (tlist == null) {
                        tlist = new ArrayList<Event>();
                        result.put(tenantId, tlist);
                    }
                    tlist.add(EventStreamDataStore.buildEvent(streamId, payload, metaData, correlationData, record.getTimestamp()));
                }
                ids.add(record.getId());
            }
            ads.delete(-1000, EVENT_STREAM_DATA_STORE_TABLE, ids);
            return result;
        }
        catch (AnalyticsException e) {
            throw new RuntimeException("Error in extracting next event batch in Spark event store: " + e.getMessage(), e);
        }
    }

    private static Map<String, Object> createValues(EventRecord eventRecord) {
        HashMap<String, Object> result = new HashMap<String, Object>();
        result.put("tenantId", eventRecord.getTenantId());
        result.put("streamId", eventRecord.getStreamId());
        result.put("payload", eventRecord.getPayloadEntries());
        result.put("meta", eventRecord.getMetaEntries());
        result.put("correlation", eventRecord.getCorrelationEntries());
        return result;
    }

    private static Event buildEvent(String streamId, List<Object> payload, List<Object> metaData, List<Object> correlationData, long ts) {
        Event event = new Event();
        event.setTimeStamp(ts);
        event.setStreamId(streamId);
        event.setPayloadData(payload.toArray());
        event.setMetaData(metaData.toArray());
        event.setCorrelationData(correlationData.toArray());
        return event;
    }
}

