/*
 * Decompiled with CFR 0.152.
 */
package org.apache.falcon.entity;

import java.io.IOException;
import java.lang.reflect.Method;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import org.apache.commons.beanutils.PropertyUtils;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.Pair;
import org.apache.falcon.Tag;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityNotRegisteredException;
import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.ProcessHelper;
import org.apache.falcon.entity.Storage;
import org.apache.falcon.entity.WorkflowNameBuilder;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityGraph;
import org.apache.falcon.entity.v0.EntityNotification;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
import org.apache.falcon.entity.v0.cluster.Properties;
import org.apache.falcon.entity.v0.cluster.Property;
import org.apache.falcon.entity.v0.feed.ClusterType;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.Cluster;
import org.apache.falcon.entity.v0.process.LateInput;
import org.apache.falcon.entity.v0.process.LateProcess;
import org.apache.falcon.entity.v0.process.PolicyType;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.entity.v0.process.Retry;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.resource.EntityList;
import org.apache.falcon.util.DeploymentUtil;
import org.apache.falcon.util.RuntimeProperties;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class EntityUtil {
    public static final Logger LOG = LoggerFactory.getLogger(EntityUtil.class);
    private static final long MINUTE_IN_MS = 60000L;
    private static final long HOUR_IN_MS = 3600000L;
    private static final long DAY_IN_MS = 86400000L;
    private static final long MONTH_IN_MS = 2678400000L;
    private static final long ONE_MS = 1L;
    public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
    private static final String STAGING_DIR_NAME_SEPARATOR = "_";

    private EntityUtil() {
    }

    public static <T extends Entity> T getEntity(EntityType type, String entityName) throws FalconException {
        ConfigurationStore configStore = ConfigurationStore.get();
        Object entity = configStore.get(type, entityName);
        if (entity == null) {
            throw new EntityNotRegisteredException(entityName + " (" + type + ") not found");
        }
        return entity;
    }

    public static <T extends Entity> T getEntity(String type, String entityName) throws FalconException {
        EntityType entityType;
        try {
            entityType = EntityType.getEnum((String)type);
        }
        catch (IllegalArgumentException e) {
            throw new FalconException("Invalid entity type: " + type, e);
        }
        return EntityUtil.getEntity(entityType, entityName);
    }

    public static TimeZone getTimeZone(String tzId) {
        if (tzId == null) {
            throw new IllegalArgumentException("Invalid TimeZone: Cannot be null.");
        }
        TimeZone tz = TimeZone.getTimeZone(tzId);
        if (!tzId.equals("GMT") && tz.getID().equals("GMT")) {
            throw new IllegalArgumentException("Invalid TimeZone: " + tzId);
        }
        return tz;
    }

    public static Date getEndTime(Entity entity, String cluster) {
        if (entity.getEntityType() == EntityType.PROCESS) {
            return EntityUtil.getEndTime((Process)entity, cluster);
        }
        return EntityUtil.getEndTime((Feed)entity, cluster);
    }

    public static Date parseDateUTC(String dateStr) throws FalconException {
        try {
            return SchemaHelper.parseDateUTC((String)dateStr);
        }
        catch (Exception e) {
            throw new FalconException(e);
        }
    }

    public static Date getStartTime(Entity entity, String cluster) {
        if (entity.getEntityType() == EntityType.PROCESS) {
            return EntityUtil.getStartTime((Process)entity, cluster);
        }
        return EntityUtil.getStartTime((Feed)entity, cluster);
    }

    public static Date getEndTime(Process process, String cluster) {
        Cluster processCluster = ProcessHelper.getCluster(process, cluster);
        return processCluster.getValidity().getEnd();
    }

    public static Date getStartTime(Process process, String cluster) {
        Cluster processCluster = ProcessHelper.getCluster(process, cluster);
        return processCluster.getValidity().getStart();
    }

    public static Date getEndTime(Feed feed, String cluster) {
        org.apache.falcon.entity.v0.feed.Cluster clusterDef = FeedHelper.getCluster(feed, cluster);
        return clusterDef.getValidity().getEnd();
    }

    public static Date getStartTime(Feed feed, String cluster) {
        org.apache.falcon.entity.v0.feed.Cluster clusterDef = FeedHelper.getCluster(feed, cluster);
        return clusterDef.getValidity().getStart();
    }

    public static int getParallel(Entity entity) {
        if (entity.getEntityType() == EntityType.PROCESS) {
            return EntityUtil.getParallel((Process)entity);
        }
        return EntityUtil.getParallel((Feed)entity);
    }

    public static void setStartDate(Entity entity, String cluster, Date startDate) {
        if (entity.getEntityType() == EntityType.PROCESS) {
            EntityUtil.setStartDate((Process)entity, cluster, startDate);
        } else {
            EntityUtil.setStartDate((Feed)entity, cluster, startDate);
        }
    }

    public static void setEndTime(Entity entity, String cluster, Date endDate) {
        if (entity.getEntityType() == EntityType.PROCESS) {
            EntityUtil.setEndTime((Process)entity, cluster, endDate);
        } else {
            EntityUtil.setEndTime((Feed)entity, cluster, endDate);
        }
    }

    public static void setParallel(Entity entity, int parallel) {
        if (entity.getEntityType() == EntityType.PROCESS) {
            EntityUtil.setParallel((Process)entity, parallel);
        } else {
            EntityUtil.setParallel((Feed)entity, parallel);
        }
    }

    public static int getParallel(Process process) {
        return process.getParallel();
    }

    public static void setStartDate(Process process, String cluster, Date startDate) {
        Cluster processCluster = ProcessHelper.getCluster(process, cluster);
        processCluster.getValidity().setStart(startDate);
    }

    public static void setParallel(Process process, int parallel) {
        process.setParallel(parallel);
    }

    public static void setEndTime(Process process, String cluster, Date endDate) {
        Cluster processCluster = ProcessHelper.getCluster(process, cluster);
        processCluster.getValidity().setEnd(endDate);
    }

    public static int getParallel(Feed feed) {
        return 1;
    }

    public static void setStartDate(Feed feed, String cluster, Date startDate) {
        org.apache.falcon.entity.v0.feed.Cluster clusterDef = FeedHelper.getCluster(feed, cluster);
        clusterDef.getValidity().setStart(startDate);
    }

    public static void setEndTime(Feed feed, String cluster, Date endDate) {
        org.apache.falcon.entity.v0.feed.Cluster clusterDef = FeedHelper.getCluster(feed, cluster);
        clusterDef.getValidity().setStart(endDate);
    }

    public static void setParallel(Feed feed, int parallel) {
    }

    public static Frequency getFrequency(Entity entity) {
        if (entity.getEntityType() == EntityType.PROCESS) {
            return EntityUtil.getFrequency((Process)entity);
        }
        return EntityUtil.getFrequency((Feed)entity);
    }

    public static Frequency getFrequency(Process process) {
        return process.getFrequency();
    }

    public static Frequency getFrequency(Feed feed) {
        return feed.getFrequency();
    }

    public static TimeZone getTimeZone(Entity entity) {
        if (entity.getEntityType() == EntityType.PROCESS) {
            return EntityUtil.getTimeZone((Process)entity);
        }
        return EntityUtil.getTimeZone((Feed)entity);
    }

    public static TimeZone getTimeZone(Process process) {
        return process.getTimezone();
    }

    public static TimeZone getTimeZone(Feed feed) {
        return feed.getTimezone();
    }

    public static boolean isValidInstanceTime(Date startTime, Frequency frequency, TimeZone timezone, Date instanceTime) {
        Date next = EntityUtil.getNextStartTime(startTime, frequency, timezone, instanceTime);
        return next.equals(instanceTime);
    }

    public static Date getNextStartTime(Date startTime, Frequency frequency, TimeZone timezone, Date referenceTime) {
        if (startTime.after(referenceTime)) {
            return startTime;
        }
        Calendar startCal = Calendar.getInstance(timezone);
        startCal.setTime(startTime);
        int count = 0;
        switch (frequency.getTimeUnit()) {
            case months: {
                count = (int)((referenceTime.getTime() - startTime.getTime()) / 2678400000L);
                break;
            }
            case days: {
                count = (int)((referenceTime.getTime() - startTime.getTime()) / 86400000L);
                break;
            }
            case hours: {
                count = (int)((referenceTime.getTime() - startTime.getTime()) / 3600000L);
                break;
            }
            case minutes: {
                count = (int)((referenceTime.getTime() - startTime.getTime()) / 60000L);
                break;
            }
        }
        int freq = frequency.getFrequencyAsInt();
        if (count > 2) {
            startCal.add(frequency.getTimeUnit().getCalendarUnit(), (count - 2) / freq * freq);
        }
        while (startCal.getTime().before(referenceTime)) {
            startCal.add(frequency.getTimeUnit().getCalendarUnit(), freq);
        }
        return startCal.getTime();
    }

    public static java.util.Properties getEntityProperties(Entity myEntity) {
        java.util.Properties properties = new java.util.Properties();
        switch (myEntity.getEntityType()) {
            case CLUSTER: {
                Properties clusterProps = ((org.apache.falcon.entity.v0.cluster.Cluster)myEntity).getProperties();
                if (clusterProps == null) break;
                for (Property prop : clusterProps.getProperties()) {
                    properties.put(prop.getName(), prop.getValue());
                }
                break;
            }
            case FEED: {
                org.apache.falcon.entity.v0.feed.Properties feedProps = ((Feed)myEntity).getProperties();
                if (feedProps == null) break;
                for (org.apache.falcon.entity.v0.feed.Property prop : feedProps.getProperties()) {
                    properties.put(prop.getName(), prop.getValue());
                }
                break;
            }
            case PROCESS: {
                org.apache.falcon.entity.v0.process.Properties processProps = ((Process)myEntity).getProperties();
                if (processProps == null) break;
                for (org.apache.falcon.entity.v0.process.Property prop : processProps.getProperties()) {
                    properties.put(prop.getName(), prop.getValue());
                }
                break;
            }
            default: {
                throw new IllegalArgumentException("Unhandled entity type " + myEntity.getEntityType());
            }
        }
        return properties;
    }

    public static int getInstanceSequence(Date startTime, Frequency frequency, TimeZone tz, Date instanceTime) {
        if (startTime.after(instanceTime)) {
            return -1;
        }
        if (tz == null) {
            tz = TimeZone.getTimeZone("UTC");
        }
        Calendar startCal = Calendar.getInstance(tz);
        startCal.setTime(startTime);
        int count = 0;
        switch (frequency.getTimeUnit()) {
            case months: {
                count = (int)((instanceTime.getTime() - startTime.getTime()) / 2678400000L);
                break;
            }
            case days: {
                count = (int)((instanceTime.getTime() - startTime.getTime()) / 86400000L);
                break;
            }
            case hours: {
                count = (int)((instanceTime.getTime() - startTime.getTime()) / 3600000L);
                break;
            }
            case minutes: {
                count = (int)((instanceTime.getTime() - startTime.getTime()) / 60000L);
                break;
            }
        }
        int freq = frequency.getFrequencyAsInt();
        if (count > 2) {
            startCal.add(frequency.getTimeUnit().getCalendarUnit(), count / freq * freq);
            count /= freq;
        } else {
            count = 0;
        }
        while (startCal.getTime().before(instanceTime)) {
            startCal.add(frequency.getTimeUnit().getCalendarUnit(), freq);
            ++count;
        }
        return count + 1;
    }

    public static Date getNextInstanceTime(Date instanceTime, Frequency frequency, TimeZone tz, int instanceCount) {
        if (tz == null) {
            tz = TimeZone.getTimeZone("UTC");
        }
        Calendar insCal = Calendar.getInstance(tz);
        insCal.setTime(instanceTime);
        int freq = frequency.getFrequencyAsInt() * instanceCount;
        insCal.add(frequency.getTimeUnit().getCalendarUnit(), freq);
        return insCal.getTime();
    }

    public static String md5(Entity entity) throws FalconException {
        return new String(Hex.encodeHex((byte[])DigestUtils.md5((String)EntityUtil.stringOf(entity))));
    }

    public static boolean equals(Entity lhs, Entity rhs) throws FalconException {
        return EntityUtil.equals(lhs, rhs, null);
    }

    public static boolean equals(Entity lhs, Entity rhs, String[] filterProps) throws FalconException {
        if (lhs == null && rhs == null) {
            return true;
        }
        if (lhs == null || rhs == null) {
            return false;
        }
        if (lhs.equals((Object)rhs)) {
            String lhsString = EntityUtil.stringOf(lhs, filterProps);
            String rhsString = EntityUtil.stringOf(rhs, filterProps);
            return lhsString.equals(rhsString);
        }
        return false;
    }

    public static String stringOf(Entity entity) throws FalconException {
        return EntityUtil.stringOf(entity, null);
    }

    private static String stringOf(Entity entity, String[] filterProps) throws FalconException {
        HashMap<String, String> map = new HashMap<String, String>();
        EntityUtil.mapToProperties(entity, null, map, filterProps);
        ArrayList keyList = new ArrayList(map.keySet());
        Collections.sort(keyList);
        StringBuilder builer = new StringBuilder();
        for (String key : keyList) {
            builer.append(key).append('=').append((String)map.get(key)).append('\n');
        }
        return builer.toString();
    }

    private static void mapToProperties(Object obj, String name, Map<String, String> propMap, String[] filterProps) throws FalconException {
        if (obj == null) {
            return;
        }
        if (filterProps != null && name != null) {
            for (String filter : filterProps) {
                if (!name.matches(filter.replace(".", "\\.").replace("[", "\\[").replace("]", "\\]"))) continue;
                return;
            }
        }
        if (Date.class.isAssignableFrom(obj.getClass())) {
            propMap.put(name, SchemaHelper.formatDateUTC((Date)((Date)obj)));
        } else if (obj.getClass().getPackage().getName().equals("java.lang")) {
            propMap.put(name, String.valueOf(obj));
        } else if (TimeZone.class.isAssignableFrom(obj.getClass())) {
            propMap.put(name, ((TimeZone)obj).getID());
        } else if (Enum.class.isAssignableFrom(obj.getClass())) {
            propMap.put(name, ((Enum)obj).name());
        } else if (List.class.isAssignableFrom(obj.getClass())) {
            List list = (List)obj;
            for (int index = 0; index < list.size(); ++index) {
                EntityUtil.mapToProperties(list.get(index), name + "[" + index + "]", propMap, filterProps);
            }
        } else {
            try {
                Method method = obj.getClass().getDeclaredMethod("toString", new Class[0]);
                propMap.put(name, (String)method.invoke(obj, new Object[0]));
            }
            catch (NoSuchMethodException e) {
                try {
                    Map map = PropertyUtils.describe((Object)obj);
                    for (Map.Entry entry : map.entrySet()) {
                        String key = (String)entry.getKey();
                        if (!key.equals("class")) {
                            EntityUtil.mapToProperties(map.get(key), name != null ? name + "." + key : key, propMap, filterProps);
                            continue;
                        }
                        propMap.put(((Class)map.get(key)).getSimpleName(), "");
                    }
                }
                catch (Exception e1) {
                    throw new FalconException(e1);
                }
            }
            catch (Exception e) {
                throw new FalconException(e);
            }
        }
    }

    public static WorkflowNameBuilder.WorkflowName getWorkflowName(Tag tag, List<String> suffixes, Entity entity) {
        WorkflowNameBuilder<Entity> builder = new WorkflowNameBuilder<Entity>(entity);
        builder.setTag(tag);
        builder.setSuffixes(suffixes);
        return builder.getWorkflowName();
    }

    public static WorkflowNameBuilder.WorkflowName getWorkflowName(Tag tag, Entity entity) {
        return EntityUtil.getWorkflowName(tag, null, entity);
    }

    public static WorkflowNameBuilder.WorkflowName getWorkflowName(Entity entity) {
        return EntityUtil.getWorkflowName(null, null, entity);
    }

    public static String getWorkflowNameSuffix(String workflowName, Entity entity) throws FalconException {
        WorkflowNameBuilder<Entity> builder = new WorkflowNameBuilder<Entity>(entity);
        return builder.getWorkflowSuffixes(workflowName).replaceAll(STAGING_DIR_NAME_SEPARATOR, "");
    }

    public static Tag getWorkflowNameTag(String workflowName, Entity entity) {
        WorkflowNameBuilder<Entity> builder = new WorkflowNameBuilder<Entity>(entity);
        return builder.getWorkflowTag(workflowName);
    }

    public static List<String> getWorkflowNames(Entity entity) {
        switch (entity.getEntityType()) {
            case FEED: {
                return Arrays.asList(EntityUtil.getWorkflowName(Tag.RETENTION, entity).toString(), EntityUtil.getWorkflowName(Tag.REPLICATION, entity).toString());
            }
            case PROCESS: {
                return Arrays.asList(EntityUtil.getWorkflowName(Tag.DEFAULT, entity).toString());
            }
        }
        throw new IllegalArgumentException("Unhandled type: " + entity.getEntityType());
    }

    public static <T extends Entity> T getClusterView(T entity, String clusterName) {
        switch (entity.getEntityType()) {
            case CLUSTER: {
                return entity;
            }
            case FEED: {
                Feed feed = (Feed)entity.copy();
                org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, clusterName);
                Iterator itr = feed.getClusters().getClusters().iterator();
                while (itr.hasNext()) {
                    org.apache.falcon.entity.v0.feed.Cluster cluster = (org.apache.falcon.entity.v0.feed.Cluster)itr.next();
                    if (cluster.getName().equals(clusterName) || feedCluster.getType() == ClusterType.TARGET && cluster.getType() == ClusterType.SOURCE) continue;
                    itr.remove();
                }
                return (T)feed;
            }
            case PROCESS: {
                Process process = (Process)entity.copy();
                Iterator procItr = process.getClusters().getClusters().iterator();
                while (procItr.hasNext()) {
                    Cluster cluster = (Cluster)procItr.next();
                    if (cluster.getName().equals(clusterName)) continue;
                    procItr.remove();
                }
                return (T)process;
            }
        }
        throw new UnsupportedOperationException("Not supported for entity type " + entity.getEntityType());
    }

    public static Set<String> getClustersDefined(Entity entity) {
        HashSet<String> clusters = new HashSet<String>();
        switch (entity.getEntityType()) {
            case CLUSTER: {
                clusters.add(entity.getName());
                break;
            }
            case FEED: {
                Feed feed = (Feed)entity;
                for (org.apache.falcon.entity.v0.feed.Cluster cluster : feed.getClusters().getClusters()) {
                    clusters.add(cluster.getName());
                }
                break;
            }
            case PROCESS: {
                Process process = (Process)entity;
                for (Cluster cluster : process.getClusters().getClusters()) {
                    clusters.add(cluster.getName());
                }
                break;
            }
        }
        return clusters;
    }

    public static Set<String> getClustersDefinedInColos(Entity entity) {
        Set<String> entityClusters = EntityUtil.getClustersDefined(entity);
        if (DeploymentUtil.isEmbeddedMode()) {
            return entityClusters;
        }
        Set<String> myClusters = DeploymentUtil.getCurrentClusters();
        HashSet<String> applicableClusters = new HashSet<String>();
        for (String cluster : entityClusters) {
            if (!myClusters.contains(cluster)) continue;
            applicableClusters.add(cluster);
        }
        return applicableClusters;
    }

    public static Retry getRetry(Entity entity) throws FalconException {
        switch (entity.getEntityType()) {
            case FEED: {
                if (!RuntimeProperties.get().getProperty("feed.retry.allowed", "true").equalsIgnoreCase("true")) {
                    return null;
                }
                Retry retry = new Retry();
                retry.setAttempts(Integer.parseInt(RuntimeProperties.get().getProperty("feed.retry.attempts", "3")));
                retry.setDelay(new Frequency(RuntimeProperties.get().getProperty("feed.retry.frequency", "minutes(5)")));
                retry.setPolicy(PolicyType.fromValue((String)RuntimeProperties.get().getProperty("feed.retry.policy", "exp-backoff")));
                return retry;
            }
            case PROCESS: {
                Process process = (Process)entity;
                return process.getRetry();
            }
        }
        throw new FalconException("Cannot create Retry for entity:" + entity.getName());
    }

    public static Path getBaseStagingPath(org.apache.falcon.entity.v0.cluster.Cluster cluster, Entity entity) {
        return new Path(ClusterHelper.getLocation(cluster, ClusterLocationType.STAGING).getPath(), "falcon/workflows/" + entity.getEntityType().name().toLowerCase() + "/" + entity.getName());
    }

    public static Path getLatestStagingPath(org.apache.falcon.entity.v0.cluster.Cluster cluster, Entity entity) throws FalconException {
        Path basePath = EntityUtil.getBaseStagingPath(cluster, entity);
        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(ClusterHelper.getConfiguration(cluster));
        try {
            final String md5 = EntityUtil.md5(EntityUtil.getClusterView(entity, cluster.getName()));
            Object[] files = fs.listStatus(basePath, new PathFilter(){

                public boolean accept(Path path) {
                    return path.getName().startsWith(md5);
                }
            });
            if (files != null && files.length != 0) {
                Arrays.sort(files);
                return files[files.length - 1].getPath();
            }
            throw new FalconException("No staging directories found for entity " + entity.getName() + " on cluster " + cluster.getName());
        }
        catch (Exception e) {
            throw new FalconException("Unable get listing for " + basePath.toString(), e);
        }
    }

    public static Path getNewStagingPath(org.apache.falcon.entity.v0.cluster.Cluster cluster, Entity entity) throws FalconException {
        Entity clusterView = EntityUtil.getClusterView(entity, cluster.getName());
        return new Path(EntityUtil.getBaseStagingPath(cluster, entity), EntityUtil.md5(clusterView) + STAGING_DIR_NAME_SEPARATOR + String.valueOf(System.currentTimeMillis()));
    }

    public static boolean isStagingPath(org.apache.falcon.entity.v0.cluster.Cluster cluster, Entity entity, Path path) throws FalconException {
        String basePath = new Path(ClusterHelper.getLocation(cluster, ClusterLocationType.STAGING).getPath()).toUri().getPath();
        try {
            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(ClusterHelper.getConfiguration(cluster));
            String pathString = path.toUri().getPath();
            String entityPath = entity.getEntityType().name().toLowerCase() + "/" + entity.getName();
            return fs.exists(path) && pathString.startsWith(basePath) && pathString.contains(entityPath);
        }
        catch (IOException e) {
            throw new FalconException(e);
        }
    }

    public static LateProcess getLateProcess(Entity entity) throws FalconException {
        switch (entity.getEntityType()) {
            case FEED: {
                if (!RuntimeProperties.get().getProperty("feed.late.allowed", "true").equalsIgnoreCase("true")) {
                    return null;
                }
                if (((Feed)entity).getLateArrival() == null) {
                    return null;
                }
                LateProcess lateProcess = new LateProcess();
                lateProcess.setDelay(new Frequency(RuntimeProperties.get().getProperty("feed.late.frequency", "hours(3)")));
                lateProcess.setPolicy(PolicyType.fromValue((String)RuntimeProperties.get().getProperty("feed.late.policy", "exp-backoff")));
                LateInput lateInput = new LateInput();
                lateInput.setInput(entity.getName());
                lateInput.setWorkflowPath("ignore.xml");
                lateProcess.getLateInputs().add(lateInput);
                return lateProcess;
            }
            case PROCESS: {
                Process process = (Process)entity;
                return process.getLateProcess();
            }
        }
        throw new FalconException("Cannot create Late Process for entity:" + entity.getName());
    }

    public static Path getLogPath(org.apache.falcon.entity.v0.cluster.Cluster cluster, Entity entity) {
        return new Path(EntityUtil.getBaseStagingPath(cluster, entity), "logs");
    }

    public static String fromUTCtoURIDate(String utc) throws FalconException {
        Date utcDate;
        SimpleDateFormat utcFormat = new SimpleDateFormat("yyyy'-'MM'-'dd'T'HH':'mm'Z'");
        try {
            utcDate = utcFormat.parse(utc);
        }
        catch (ParseException e) {
            throw new FalconException("Unable to parse utc date:", e);
        }
        SimpleDateFormat uriFormat = new SimpleDateFormat("yyyy'-'MM'-'dd'-'HH'-'mm");
        return uriFormat.format(utcDate);
    }

    public static boolean responsibleFor(String colo) {
        return DeploymentUtil.isEmbeddedMode() || !DeploymentUtil.isPrism() && colo.equals(DeploymentUtil.getCurrentColo());
    }

    public static Date getNextStartTime(Entity entity, org.apache.falcon.entity.v0.cluster.Cluster cluster, Date effectiveTime) {
        switch (entity.getEntityType()) {
            case FEED: {
                Feed feed = (Feed)entity;
                org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
                return EntityUtil.getNextStartTime(feedCluster.getValidity().getStart(), feed.getFrequency(), feed.getTimezone(), effectiveTime);
            }
            case PROCESS: {
                Process process = (Process)entity;
                Cluster processCluster = ProcessHelper.getCluster(process, cluster.getName());
                return EntityUtil.getNextStartTime(processCluster.getValidity().getStart(), process.getFrequency(), process.getTimezone(), effectiveTime);
            }
        }
        throw new IllegalArgumentException("Unhandled type: " + entity.getEntityType());
    }

    public static boolean isTableStorageType(org.apache.falcon.entity.v0.cluster.Cluster cluster, Entity entity) throws FalconException {
        return entity.getEntityType() == EntityType.PROCESS ? EntityUtil.isTableStorageType(cluster, (Process)entity) : EntityUtil.isTableStorageType(cluster, (Feed)entity);
    }

    public static boolean isTableStorageType(org.apache.falcon.entity.v0.cluster.Cluster cluster, Feed feed) throws FalconException {
        Storage.TYPE storageType = FeedHelper.getStorageType(feed, cluster);
        return Storage.TYPE.TABLE == storageType;
    }

    public static boolean isTableStorageType(org.apache.falcon.entity.v0.cluster.Cluster cluster, Process process) throws FalconException {
        Storage.TYPE storageType = ProcessHelper.getStorageType(cluster, process);
        return Storage.TYPE.TABLE == storageType;
    }

    public static List<String> getTags(Entity entity) {
        String rawTags = null;
        switch (entity.getEntityType()) {
            case PROCESS: {
                rawTags = ((Process)entity).getTags();
                break;
            }
            case FEED: {
                rawTags = ((Feed)entity).getTags();
                break;
            }
            case CLUSTER: {
                rawTags = ((org.apache.falcon.entity.v0.cluster.Cluster)entity).getTags();
                break;
            }
        }
        ArrayList<String> tags = new ArrayList<String>();
        if (!StringUtils.isEmpty((CharSequence)rawTags)) {
            for (String tag : rawTags.split(",")) {
                tags.add(tag.trim());
            }
        }
        return tags;
    }

    public static List<String> getPipelines(Entity entity) {
        Process process;
        String pipelineString;
        ArrayList<String> pipelines = new ArrayList<String>();
        if (entity.getEntityType().equals((Object)EntityType.PROCESS) && (pipelineString = (process = (Process)entity).getPipelines()) != null) {
            for (String pipeline : pipelineString.split(",")) {
                pipelines.add(pipeline.trim());
            }
        }
        return pipelines;
    }

    public static EntityList getEntityDependencies(Entity entity) throws FalconException {
        Set<Entity> dependents = EntityGraph.get().getDependents(entity);
        Entity[] dependentEntities = dependents.toArray(new Entity[dependents.size()]);
        return new EntityList(dependentEntities, entity);
    }

    public static Pair<Date, Date> getEntityStartEndDates(Entity entityObject) {
        Set<String> clusters = EntityUtil.getClustersDefined(entityObject);
        Pair clusterMinStartDate = null;
        Pair clusterMaxEndDate = null;
        for (String cluster : clusters) {
            if (clusterMinStartDate == null || ((Date)clusterMinStartDate.first).after(EntityUtil.getStartTime(entityObject, cluster))) {
                clusterMinStartDate = Pair.of((Object)EntityUtil.getStartTime(entityObject, cluster), (Object)cluster);
            }
            if (clusterMaxEndDate != null && !((Date)clusterMaxEndDate.first).before(EntityUtil.getEndTime(entityObject, cluster))) continue;
            clusterMaxEndDate = Pair.of((Object)EntityUtil.getEndTime(entityObject, cluster), (Object)cluster);
        }
        return new Pair(clusterMinStartDate.first, clusterMaxEndDate.first);
    }

    public static Date getPreviousInstanceTime(Date startTime, Frequency frequency, TimeZone tz, Date referenceTime) {
        if (tz == null) {
            tz = TimeZone.getTimeZone("UTC");
        }
        Calendar insCal = Calendar.getInstance(tz);
        insCal.setTime(startTime);
        int instanceCount = EntityUtil.getInstanceSequence(startTime, frequency, tz, referenceTime) - 1;
        int freq = frequency.getFrequencyAsInt() * instanceCount;
        insCal.add(frequency.getTimeUnit().getCalendarUnit(), freq);
        while (insCal.getTime().after(referenceTime)) {
            insCal.add(frequency.getTimeUnit().getCalendarUnit(), frequency.getFrequencyAsInt() * -1);
        }
        return insCal.getTime();
    }

    public static List<Date> getEntityInstanceTimes(Entity entity, String clusterName, Date startRange, Date endRange) {
        Date start = null;
        switch (entity.getEntityType()) {
            case FEED: {
                Feed feed = (Feed)entity;
                start = FeedHelper.getCluster(feed, clusterName).getValidity().getStart();
                return EntityUtil.getInstanceTimes(start, feed.getFrequency(), feed.getTimezone(), startRange, endRange);
            }
            case PROCESS: {
                Process process = (Process)entity;
                start = ProcessHelper.getCluster(process, clusterName).getValidity().getStart();
                return EntityUtil.getInstanceTimes(start, process.getFrequency(), process.getTimezone(), startRange, endRange);
            }
        }
        throw new IllegalArgumentException("Unhandled type: " + entity.getEntityType());
    }

    public static List<Date> getInstanceTimes(Date startTime, Frequency frequency, TimeZone timeZone, Date startRange, Date endRange) {
        Date nextStartTime;
        LinkedList<Date> result = new LinkedList<Date>();
        if (timeZone == null) {
            timeZone = TimeZone.getTimeZone("UTC");
        }
        Date current = EntityUtil.getPreviousInstanceTime(startTime, frequency, timeZone, startRange);
        while (!(nextStartTime = EntityUtil.getNextStartTime(startTime, frequency, timeZone, current)).after(endRange)) {
            result.add(nextStartTime);
            current = new Date(nextStartTime.getTime() + 1L);
        }
        return result;
    }

    public static EntityNotification getEntityNotification(Entity entity) {
        switch (entity.getEntityType()) {
            case FEED: {
                Feed feed = (Feed)entity;
                return feed.getNotification();
            }
            case PROCESS: {
                Process process = (Process)entity;
                return process.getNotification();
            }
        }
        throw new IllegalArgumentException("Unhandled type: " + entity.getEntityType());
    }

    public static Map<String, String> getPropertyMap(String properties) {
        HashMap<String, String> props = new HashMap<String, String>();
        if (StringUtils.isNotEmpty((CharSequence)properties)) {
            String[] kvPairs;
            for (String kvPair : kvPairs = properties.split(",")) {
                String[] keyValue = kvPair.trim().split(":", 2);
                if (keyValue.length != 2 || keyValue[0].trim().isEmpty() || keyValue[1].trim().isEmpty()) {
                    throw new IllegalArgumentException("Found invalid property " + keyValue[0] + ". Schedule properties must be comma separated key-value pairs. " + " Example: key1:value1,key2:value2");
                }
                props.put(keyValue[0].trim(), keyValue[1].trim());
            }
        }
        return props;
    }
}

