/*
 * Decompiled with CFR 0.152.
 */
package org.wso2.carbon.analytics.spark.event;

import java.io.Serializable;
import java.util.ArrayList;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.StructType;
import org.wso2.carbon.analytics.datasource.commons.exception.AnalyticsException;
import org.wso2.carbon.analytics.spark.event.EventRecord;
import org.wso2.carbon.analytics.spark.event.EventStreamDataStore;
import scala.collection.Iterator;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;

public class EventIteratorFunction
extends AbstractFunction1<Iterator<Row>, BoxedUnit>
implements Serializable {
    private static final long serialVersionUID = 4048303072566432397L;
    private int tenantId;
    private String streamId;
    private StructType sch;
    private boolean globalTenantAccess;

    public EventIteratorFunction(int tenantId, String streamId, StructType sch, boolean globalTenantAccess) {
        this.tenantId = tenantId;
        this.streamId = streamId;
        this.sch = sch;
        this.globalTenantAccess = globalTenantAccess;
    }

    public BoxedUnit apply(Iterator<Row> iterator) {
        ArrayList<EventRecord> storeEntries = new ArrayList<EventRecord>(1000);
        try {
            while (iterator.hasNext()) {
                storeEntries.add(this.createEventRecordFromRow((Row)iterator.next()));
                if (storeEntries.size() % 1000 != 0) continue;
                EventStreamDataStore.addToStore(storeEntries);
                storeEntries.clear();
            }
            EventStreamDataStore.addToStore(storeEntries);
        }
        catch (AnalyticsException e) {
            throw new RuntimeException("Error in writing event store entires: " + e.getMessage(), e);
        }
        return BoxedUnit.UNIT;
    }

    private EventRecord createEventRecordFromRow(Row row) {
        String[] colNames = this.sch.fieldNames();
        boolean globalTenantProcessed = false;
        int targetTenantId = this.tenantId;
        ArrayList<Object> payloadData = new ArrayList<Object>();
        ArrayList<Object> metaData = new ArrayList<Object>();
        ArrayList<Object> correlationData = new ArrayList<Object>();
        for (int i = 0; i < row.length(); ++i) {
            if (this.globalTenantAccess && colNames[i].equals("_tenantId")) {
                targetTenantId = row.getInt(i);
                globalTenantProcessed = true;
                continue;
            }
            if (colNames[i].startsWith("meta_")) {
                metaData.add(row.get(i));
                continue;
            }
            if (colNames[i].startsWith("correlation_")) {
                correlationData.add(row.get(i));
                continue;
            }
            payloadData.add(row.get(i));
        }
        if (this.globalTenantAccess && !globalTenantProcessed) {
            throw new RuntimeException("The field '_tenantId' is not found in row: " + row + " with schema: " + this.sch + " when creating a global tenant access record");
        }
        return new EventRecord(targetTenantId, this.streamId, payloadData, metaData, correlationData);
    }
}

