/*
 * Decompiled with CFR 0.152.
 */
package org.wso2.carbon.analytics.eventsink;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.deployment.AbstractDeployer;
import org.apache.axis2.deployment.DeploymentException;
import org.apache.axis2.deployment.repository.util.DeploymentFileData;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.analytics.dataservice.core.AnalyticsDataServiceUtils;
import org.wso2.carbon.analytics.datasource.commons.AnalyticsSchema;
import org.wso2.carbon.analytics.datasource.commons.exception.AnalyticsException;
import org.wso2.carbon.analytics.eventsink.AnalyticsEventStore;
import org.wso2.carbon.analytics.eventsink.exception.AnalyticsEventStoreDeploymentException;
import org.wso2.carbon.analytics.eventsink.exception.AnalyticsEventStoreException;
import org.wso2.carbon.analytics.eventsink.internal.AnalyticsEventSinkServerStartupObserver;
import org.wso2.carbon.analytics.eventsink.internal.AnalyticsEventStoreManager;
import org.wso2.carbon.analytics.eventsink.internal.util.AnalyticsEventSinkUtil;
import org.wso2.carbon.analytics.eventsink.internal.util.ServiceHolder;
import org.wso2.carbon.context.CarbonContext;
import org.wso2.carbon.databridge.core.exception.StreamDefinitionStoreException;
import org.wso2.carbon.utils.multitenancy.MultitenantUtils;

public class AnalyticsEventStoreDeployer
extends AbstractDeployer {
    private static final Log log = LogFactory.getLog(AnalyticsEventStoreDeployer.class);
    private static List<DeploymentFileData> pausedDeployments = new ArrayList<DeploymentFileData>();
    private boolean eventSinkEnabled;

    public AnalyticsEventStoreDeployer() {
        String disableEventSink = System.getProperty("disableEventSink");
        this.eventSinkEnabled = disableEventSink == null || !Boolean.parseBoolean(disableEventSink);
    }

    public void init(ConfigurationContext configurationContext) {
        File deployementDir = new File(MultitenantUtils.getAxis2RepositoryPath((int)CarbonContext.getThreadLocalCarbonContext().getTenantId()) + "eventsink");
        if (!deployementDir.exists() && !deployementDir.mkdir()) {
            log.warn((Object)("Unable to create the deployment dir at: " + deployementDir.getPath()));
        }
    }

    public void deploy(DeploymentFileData deploymentFileData) throws DeploymentException {
        if (AnalyticsEventSinkServerStartupObserver.getInstance().isServerStarted()) {
            log.info((Object)("Deploying analytics event store: " + deploymentFileData.getName()));
            int tenantId = CarbonContext.getThreadLocalCarbonContext().getTenantId();
            try {
                AnalyticsEventStore eventStoreConfiguration = AnalyticsEventStoreManager.getInstance().getAnalyticsEventStore(deploymentFileData.getFile());
                if (!AnalyticsEventSinkUtil.getAnalyticsEventStoreName(deploymentFileData.getName()).equalsIgnoreCase(eventStoreConfiguration.getName())) {
                    throw new AnalyticsEventStoreDeploymentException("Invalid configuration provided! File name: " + AnalyticsEventSinkUtil.getAnalyticsEventStoreName(deploymentFileData.getName() + " should be " + "matched with deduced table name: " + eventStoreConfiguration.getName() + " for the streams"));
                }
                this.addEventStore(tenantId, eventStoreConfiguration);
            }
            catch (AnalyticsEventStoreException e) {
                String errMsg = "Error while deploying file: " + deploymentFileData.getName() + " for tenant id: " + tenantId;
                log.error((Object)errMsg, (Throwable)e);
                throw new AnalyticsEventStoreDeploymentException(errMsg, e);
            }
            catch (Exception e) {
                String errorMsg = "Unable to deploy the event store: " + deploymentFileData.getName() + ". " + e.getMessage();
                log.error((Object)errorMsg, (Throwable)e);
                throw new AnalyticsEventStoreDeploymentException(errorMsg, e);
            }
            log.info((Object)("Deployed successfully analytics event store: " + deploymentFileData.getName()));
        } else {
            pausedDeployments.add(deploymentFileData);
        }
    }

    private void addEventStore(int tenantId, AnalyticsEventStore eventStore) throws AnalyticsEventStoreException {
        try {
            AnalyticsEventStoreManager.getInstance().addEventStoreConfiguration(tenantId, eventStore);
            if (this.eventSinkEnabled) {
                if (!ServiceHolder.getAnalyticsDataAPI().tableExists(tenantId, eventStore.getName())) {
                    if (eventStore.getRecordStore() == null) {
                        ServiceHolder.getAnalyticsDataAPI().createTable(tenantId, eventStore.getName());
                    } else {
                        ServiceHolder.getAnalyticsDataAPI().createTable(tenantId, eventStore.getRecordStore(), eventStore.getName());
                    }
                }
                ServiceHolder.getAnalyticsDataAPI().setTableSchema(tenantId, eventStore.getName(), this.resolveAndMergeSchemata(tenantId, eventStore));
                for (String streamId : eventStore.getEventSource().getStreamIds()) {
                    if (ServiceHolder.getStreamDefinitionStoreService().getStreamDefinition(streamId, tenantId) == null) continue;
                    ServiceHolder.getAnalyticsEventStreamListener().subscribeForStream(tenantId, streamId);
                }
            } else {
                log.info((Object)("Event store is disabled in this node, hence ignoring the event sink configuration: " + eventStore.getName()));
            }
        }
        catch (AnalyticsException e) {
            String errorMsg = "Error while creating the table Or setting the schema for table: " + eventStore.getName();
            log.error((Object)errorMsg, (Throwable)e);
            throw new AnalyticsEventStoreException(errorMsg, (Exception)((Object)e));
        }
        catch (StreamDefinitionStoreException e) {
            String errorMsg = "Error when subscribing to the stream: " + e.getMessage();
            log.error((Object)errorMsg, (Throwable)e);
            throw new AnalyticsEventStoreException(errorMsg, (Exception)((Object)e));
        }
    }

    private AnalyticsSchema resolveAndMergeSchemata(int tenantId, AnalyticsEventStore eventStoreConfig) throws AnalyticsException {
        try {
            AnalyticsSchema incomingSchema = AnalyticsEventSinkUtil.getAnalyticsSchema(eventStoreConfig.getAnalyticsTableSchema());
            if (!eventStoreConfig.isMergeSchema()) {
                return incomingSchema;
            }
            AnalyticsSchema liveSchema = ServiceHolder.getAnalyticsDataAPI().getTableSchema(tenantId, eventStoreConfig.getName());
            return AnalyticsDataServiceUtils.createMergedSchema((AnalyticsSchema)liveSchema, (List)incomingSchema.getPrimaryKeys(), new ArrayList(incomingSchema.getColumns().values()), new ArrayList(incomingSchema.getIndexedColumns().keySet()));
        }
        catch (AnalyticsEventStoreException e) {
            log.error((Object)("Error while retrieving the eventStore config for table: " + eventStoreConfig.getName() + ", " + e.getMessage()), (Throwable)e);
            throw new AnalyticsException("Error while retrieving the eventStore config for stream: " + eventStoreConfig.getName() + ", " + e.getMessage(), (Throwable)e);
        }
    }

    public void undeploy(String fileName) throws DeploymentException {
        log.info((Object)("Undeploying analytics event store: " + fileName));
        int tenantId = CarbonContext.getThreadLocalCarbonContext().getTenantId();
        String eventStoreName = AnalyticsEventSinkUtil.getAnalyticsEventStoreName(new File(fileName).getName());
        AnalyticsEventStore existingEventStore = AnalyticsEventStoreManager.getInstance().removeEventStoreConfiguration(tenantId, eventStoreName);
        if (this.eventSinkEnabled) {
            if (existingEventStore != null) {
                for (String streamId : existingEventStore.getEventSource().getStreamIds()) {
                    ServiceHolder.getAnalyticsEventStreamListener().unsubscribeFromStream(tenantId, streamId);
                }
            }
        } else {
            log.info((Object)("Ignored event sink configuration: " + fileName + " since the event sink is disabled in this node."));
        }
        log.info((Object)("Undeployed successfully analytics event store: " + fileName));
    }

    public static List<DeploymentFileData> getPausedDeployments() {
        return pausedDeployments;
    }

    public static void clearPausedDeployments() {
        pausedDeployments = null;
    }

    public void setDirectory(String s) {
    }

    public void setExtension(String s) {
    }
}

