/*
 * Decompiled with CFR 0.152.
 */
package org.apache.falcon.catalog;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TimeZone;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.catalog.AbstractCatalogService;
import org.apache.falcon.catalog.CatalogPartition;
import org.apache.falcon.catalog.CatalogServiceFactory;
import org.apache.falcon.entity.CatalogStorage;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.Storage;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.CatalogTable;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.expression.ExpressionHelper;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.falcon.workflow.WorkflowExecutionListener;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CatalogPartitionHandler
implements WorkflowExecutionListener {
    private static final Logger LOG = LoggerFactory.getLogger(CatalogPartitionHandler.class);
    public static final ConfigurationStore STORE = ConfigurationStore.get();
    public static final String CATALOG_TABLE = "catalog.table";
    private ExpressionHelper evaluator = ExpressionHelper.get();
    private static CatalogPartitionHandler catalogInstance = new CatalogPartitionHandler();
    private static final boolean IS_CATALOG_ENABLED = CatalogServiceFactory.isEnabled();
    public static final TimeZone UTC = TimeZone.getTimeZone("UTC");
    private static final PathFilter PATH_FILTER = new PathFilter(){

        public boolean accept(Path path) {
            try {
                FileSystem fs = path.getFileSystem(new Configuration());
                return !path.getName().startsWith("_") && !path.getName().startsWith(".") && !fs.isFile(path);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    };

    public static final CatalogPartitionHandler get() {
        return catalogInstance;
    }

    @Override
    public void onSuccess(WorkflowExecutionContext context) throws FalconException {
        if (!IS_CATALOG_ENABLED) {
            return;
        }
        String[] feedNames = context.getOutputFeedNamesList();
        String[] feedPaths = context.getOutputFeedInstancePathsList();
        Cluster cluster = (Cluster)STORE.get(EntityType.CLUSTER, context.getClusterName());
        Configuration clusterConf = ClusterHelper.getConfiguration(cluster);
        if (StringUtils.isEmpty((CharSequence)ClusterHelper.getRegistryEndPoint(cluster))) {
            LOG.info("Catalog endpoint not defined for cluster {}. Skipping partition registration", (Object)cluster.getName());
            return;
        }
        block4: for (int index = 0; index < feedNames.length; ++index) {
            LOG.info("Partition handling for feed {} for path {}", (Object)feedNames[index], (Object)feedPaths[index]);
            Feed feed = (Feed)STORE.get(EntityType.FEED, feedNames[index]);
            Storage storage = FeedHelper.createStorage(cluster, feed);
            if (storage.getType() == Storage.TYPE.TABLE) {
                LOG.info("Feed {} is already table based. Skipping partition registration", (Object)feed.getName());
                continue;
            }
            CatalogStorage catalogStorage = this.getCatalogStorageFromFeedProperties(feed, cluster, clusterConf);
            if (catalogStorage == null) {
                LOG.info("Feed {} doesn't have table defined in its properties/table doesn't exist. Skipping partition registration", (Object)feed.getName());
                continue;
            }
            Path feedPath = new Path(new Path(feedPaths[index]).toUri().getPath());
            String templatePath = new Path(storage.getUriTemplate(LocationType.DATA)).toUri().getPath();
            LOG.debug("Template {} catalogInstance path {}", (Object)templatePath, (Object)feedPath);
            Date date = FeedHelper.getDate(templatePath, feedPath, UTC);
            if (date == null) {
                LOG.info("Feed {} catalogInstance path {} doesn't match the template {}. Skipping partition registration", new Object[]{feed.getName(), feedPath, templatePath});
                continue;
            }
            LOG.debug("Reference date from path {} is {}", (Object)feedPath, (Object)SchemaHelper.formatDateUTC((Date)date));
            ExpressionHelper.setReferenceDate(date);
            ArrayList<String> partitionValues = new ArrayList<String>();
            for (Map.Entry<String, String> entry : catalogStorage.getPartitions().entrySet()) {
                LOG.debug("Evaluating partition {}", (Object)entry.getValue());
                partitionValues.add(this.evaluator.evaluateFullExpression(entry.getValue(), String.class));
            }
            LOG.debug("Static partition - {}", partitionValues);
            WorkflowExecutionContext.EntityOperations operation = context.getOperation();
            switch (operation) {
                case DELETE: {
                    this.dropPartitions(clusterConf, catalogStorage, partitionValues);
                    continue block4;
                }
                case GENERATE: 
                case REPLICATE: {
                    this.registerPartitions(clusterConf, catalogStorage, feedPath, partitionValues);
                    continue block4;
                }
                default: {
                    throw new FalconException("Unhandled operation " + (Object)((Object)operation));
                }
            }
        }
    }

    private void registerPartitions(Configuration conf, CatalogStorage storage, Path staticPath, List<String> staticPartition) throws FalconException {
        try {
            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
            if (!fs.exists(staticPath)) {
                return;
            }
            List<String> partitionColumns = this.getPartitionColumns(conf, storage);
            int dynamicPartCols = partitionColumns.size() - staticPartition.size();
            Path searchPath = staticPath;
            if (dynamicPartCols > 0) {
                searchPath = new Path(staticPath, StringUtils.repeat((String)"*", (String)"/", (int)dynamicPartCols));
            }
            FileStatus[] files = fs.globStatus(searchPath, PATH_FILTER);
            HashMap<ArrayList<String>, String> partitions = new HashMap<ArrayList<String>, String>();
            for (FileStatus file : files) {
                List<String> dynamicParts = this.getDynamicPartitions(file.getPath(), staticPath);
                ArrayList<String> partitionValues = new ArrayList<String>(staticPartition);
                partitionValues.addAll(dynamicParts);
                LOG.debug("Final partition - " + partitionValues);
                partitions.put(partitionValues, file.getPath().toString());
            }
            List<List<String>> existPartitions = this.listPartitions(conf, storage, staticPartition);
            Set targetPartitions = partitions.keySet();
            Collection partitionsForDrop = CollectionUtils.subtract(existPartitions, targetPartitions);
            Collection partitionsForAdd = CollectionUtils.subtract(targetPartitions, existPartitions);
            Collection partitionsForUpdate = CollectionUtils.intersection(existPartitions, targetPartitions);
            for (List partition : partitionsForDrop) {
                this.dropPartitions(conf, storage, partition);
            }
            for (List partition : partitionsForAdd) {
                this.addPartition(conf, storage, partition, (String)partitions.get(partition));
            }
            for (List partition : partitionsForUpdate) {
                this.updatePartition(conf, storage, partition, (String)partitions.get(partition));
            }
        }
        catch (IOException e) {
            throw new FalconException(e);
        }
    }

    private void updatePartition(Configuration conf, CatalogStorage storage, List<String> partition, String location) throws FalconException {
        AbstractCatalogService catalogService = CatalogServiceFactory.getCatalogService();
        catalogService.updatePartition(conf, storage.getCatalogUrl(), storage.getDatabase(), storage.getTable(), partition, location);
    }

    private void addPartition(Configuration conf, CatalogStorage storage, List<String> partition, String location) throws FalconException {
        AbstractCatalogService catalogService = CatalogServiceFactory.getCatalogService();
        catalogService.addPartition(conf, storage.getCatalogUrl(), storage.getDatabase(), storage.getTable(), partition, location);
    }

    private List<List<String>> listPartitions(Configuration conf, CatalogStorage storage, List<String> staticPartitions) throws FalconException {
        AbstractCatalogService catalogService = CatalogServiceFactory.getCatalogService();
        List<CatalogPartition> partitions = catalogService.listPartitions(conf, storage.getCatalogUrl(), storage.getDatabase(), storage.getTable(), staticPartitions);
        ArrayList<List<String>> existPartitions = new ArrayList<List<String>>();
        for (CatalogPartition partition : partitions) {
            existPartitions.add(partition.getValues());
        }
        return existPartitions;
    }

    protected List<String> getDynamicPartitions(Path path, Path staticPath) {
        String dynPart = path.toUri().getPath().substring(staticPath.toString().length());
        dynPart = StringUtils.removeStart((String)dynPart, (String)"/");
        if (StringUtils.isEmpty((CharSequence)(dynPart = StringUtils.removeEnd((String)dynPart, (String)"/")))) {
            return new ArrayList<String>();
        }
        return Arrays.asList(dynPart.split("/"));
    }

    private List<String> getPartitionColumns(Configuration conf, CatalogStorage storage) throws FalconException {
        AbstractCatalogService catalogService = CatalogServiceFactory.getCatalogService();
        return catalogService.getPartitionColumns(conf, storage.getCatalogUrl(), storage.getDatabase(), storage.getTable());
    }

    private void dropPartitions(Configuration conf, CatalogStorage storage, List<String> values) throws FalconException {
        AbstractCatalogService catalogService = CatalogServiceFactory.getCatalogService();
        catalogService.dropPartitions(conf, storage.getCatalogUrl(), storage.getDatabase(), storage.getTable(), values, false);
    }

    protected CatalogStorage getCatalogStorageFromFeedProperties(Feed feed, Cluster cluster, Configuration conf) throws FalconException {
        Properties properties = FeedHelper.getFeedProperties(feed);
        String tableUri = properties.getProperty(CATALOG_TABLE);
        if (tableUri == null) {
            return null;
        }
        CatalogTable table = new CatalogTable();
        table.setUri(tableUri.replace("{", "${"));
        CatalogStorage storage = null;
        try {
            storage = new CatalogStorage(cluster, table);
        }
        catch (URISyntaxException e) {
            throw new FalconException(e);
        }
        AbstractCatalogService catalogService = CatalogServiceFactory.getCatalogService();
        if (!catalogService.tableExists(conf, storage.getCatalogUrl(), storage.getDatabase(), storage.getTable())) {
            return null;
        }
        return storage;
    }

    @Override
    public void onFailure(WorkflowExecutionContext context) throws FalconException {
    }

    @Override
    public void onStart(WorkflowExecutionContext context) throws FalconException {
    }

    @Override
    public void onSuspend(WorkflowExecutionContext context) throws FalconException {
    }

    @Override
    public void onWait(WorkflowExecutionContext context) throws FalconException {
    }
}

