/*
 * Decompiled with CFR 0.152.
 */
package org.apache.falcon.entity;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.TimeZone;
import java.util.regex.Matcher;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.LifeCycle;
import org.apache.falcon.Tag;
import org.apache.falcon.entity.CatalogStorage;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.FeedInstanceStatus;
import org.apache.falcon.entity.FileSystemStorage;
import org.apache.falcon.entity.ProcessHelper;
import org.apache.falcon.entity.Storage;
import org.apache.falcon.entity.common.FeedDataPath;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.entity.v0.cluster.Property;
import org.apache.falcon.entity.v0.feed.CatalogTable;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.Lifecycle;
import org.apache.falcon.entity.v0.feed.Location;
import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.entity.v0.feed.Locations;
import org.apache.falcon.entity.v0.feed.RetentionStage;
import org.apache.falcon.entity.v0.feed.Sla;
import org.apache.falcon.entity.v0.process.Cluster;
import org.apache.falcon.entity.v0.process.Input;
import org.apache.falcon.entity.v0.process.Output;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.expression.ExpressionHelper;
import org.apache.falcon.lifecycle.FeedLifecycleStage;
import org.apache.falcon.resource.APIResult;
import org.apache.falcon.resource.EntityList;
import org.apache.falcon.resource.FeedInstanceResult;
import org.apache.falcon.resource.SchedulableEntityInstance;
import org.apache.falcon.util.BuildProperties;
import org.apache.falcon.util.DateUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class FeedHelper {
    private static final Logger LOG = LoggerFactory.getLogger(FeedHelper.class);
    private static final int ONE_MS = 1;
    public static final String FORMAT = "yyyyMMddHHmm";

    private FeedHelper() {
    }

    public static org.apache.falcon.entity.v0.feed.Cluster getCluster(Feed feed, String clusterName) {
        for (org.apache.falcon.entity.v0.feed.Cluster cluster : feed.getClusters().getClusters()) {
            if (!cluster.getName().equals(clusterName)) continue;
            return cluster;
        }
        return null;
    }

    public static Storage createStorage(Feed feed) throws FalconException {
        Locations feedLocations = feed.getLocations();
        if (feedLocations != null && feedLocations.getLocations().size() != 0) {
            return new FileSystemStorage(feed);
        }
        try {
            CatalogTable table = feed.getTable();
            if (table != null) {
                return new CatalogStorage(feed);
            }
        }
        catch (URISyntaxException e) {
            throw new FalconException(e);
        }
        throw new FalconException("Both catalog and locations are not defined.");
    }

    public static Storage createStorage(org.apache.falcon.entity.v0.cluster.Cluster clusterEntity, Feed feed) throws FalconException {
        return FeedHelper.createStorage(FeedHelper.getCluster(feed, clusterEntity.getName()), feed, clusterEntity);
    }

    public static Storage createStorage(String clusterName, Feed feed) throws FalconException {
        return FeedHelper.createStorage(FeedHelper.getCluster(feed, clusterName), feed);
    }

    public static Storage createStorage(org.apache.falcon.entity.v0.feed.Cluster cluster, Feed feed) throws FalconException {
        org.apache.falcon.entity.v0.cluster.Cluster clusterEntity = (org.apache.falcon.entity.v0.cluster.Cluster)EntityUtil.getEntity(EntityType.CLUSTER, cluster.getName());
        return FeedHelper.createStorage(cluster, feed, clusterEntity);
    }

    public static Storage createStorage(org.apache.falcon.entity.v0.feed.Cluster cluster, Feed feed, org.apache.falcon.entity.v0.cluster.Cluster clusterEntity) throws FalconException {
        List<Location> locations = FeedHelper.getLocations(cluster, feed);
        if (locations != null) {
            return new FileSystemStorage(ClusterHelper.getStorageUrl(clusterEntity), locations);
        }
        try {
            CatalogTable table = FeedHelper.getTable(cluster, feed);
            if (table != null) {
                return new CatalogStorage(clusterEntity, table);
            }
        }
        catch (URISyntaxException e) {
            throw new FalconException(e);
        }
        throw new FalconException("Both catalog and locations are not defined.");
    }

    public static Storage createReadOnlyStorage(org.apache.falcon.entity.v0.cluster.Cluster clusterEntity, Feed feed) throws FalconException {
        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, clusterEntity.getName());
        List<Location> locations = FeedHelper.getLocations(feedCluster, feed);
        if (locations != null) {
            return new FileSystemStorage(ClusterHelper.getReadOnlyStorageUrl(clusterEntity), locations);
        }
        try {
            CatalogTable table = FeedHelper.getTable(feedCluster, feed);
            if (table != null) {
                return new CatalogStorage(clusterEntity, table);
            }
        }
        catch (URISyntaxException e) {
            throw new FalconException(e);
        }
        throw new FalconException("Both catalog and locations are not defined.");
    }

    public static Storage createStorage(String type, String storageUriTemplate) throws URISyntaxException {
        Storage.TYPE storageType = Storage.TYPE.valueOf(type);
        if (storageType == Storage.TYPE.FILESYSTEM) {
            return new FileSystemStorage(storageUriTemplate);
        }
        if (storageType == Storage.TYPE.TABLE) {
            return new CatalogStorage(storageUriTemplate);
        }
        throw new IllegalArgumentException("Bad type: " + type);
    }

    public static Storage createStorage(String type, String storageUriTemplate, Configuration conf) throws URISyntaxException {
        Storage.TYPE storageType = Storage.TYPE.valueOf(type);
        if (storageType == Storage.TYPE.FILESYSTEM) {
            return new FileSystemStorage(storageUriTemplate);
        }
        if (storageType == Storage.TYPE.TABLE) {
            return new CatalogStorage(storageUriTemplate, conf);
        }
        throw new IllegalArgumentException("Bad type: " + type);
    }

    public static Storage.TYPE getStorageType(Feed feed) throws FalconException {
        Locations feedLocations = feed.getLocations();
        if (feedLocations != null && feedLocations.getLocations().size() != 0) {
            return Storage.TYPE.FILESYSTEM;
        }
        CatalogTable table = feed.getTable();
        if (table != null) {
            return Storage.TYPE.TABLE;
        }
        throw new FalconException("Both catalog and locations are not defined.");
    }

    public static Storage.TYPE getStorageType(Feed feed, org.apache.falcon.entity.v0.feed.Cluster cluster) throws FalconException {
        List<Location> locations = FeedHelper.getLocations(cluster, feed);
        if (locations != null) {
            return Storage.TYPE.FILESYSTEM;
        }
        CatalogTable table = FeedHelper.getTable(cluster, feed);
        if (table != null) {
            return Storage.TYPE.TABLE;
        }
        throw new FalconException("Both catalog and locations are not defined.");
    }

    public static Storage.TYPE getStorageType(Feed feed, org.apache.falcon.entity.v0.cluster.Cluster clusterEntity) throws FalconException {
        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, clusterEntity.getName());
        return FeedHelper.getStorageType(feed, feedCluster);
    }

    public static List<Location> getLocations(org.apache.falcon.entity.v0.feed.Cluster cluster, Feed feed) {
        Locations clusterLocations = cluster.getLocations();
        if (clusterLocations != null && clusterLocations.getLocations().size() != 0) {
            return clusterLocations.getLocations();
        }
        Locations feedLocations = feed.getLocations();
        return feedLocations == null ? null : feedLocations.getLocations();
    }

    public static Location getLocation(Feed feed, org.apache.falcon.entity.v0.cluster.Cluster cluster, LocationType type) {
        List<Location> locations = FeedHelper.getLocations(FeedHelper.getCluster(feed, cluster.getName()), feed);
        if (locations != null) {
            for (Location location : locations) {
                if (location.getType() != type) continue;
                return location;
            }
        }
        return null;
    }

    public static Sla getSLA(org.apache.falcon.entity.v0.feed.Cluster cluster, Feed feed) {
        Sla clusterSla = cluster.getSla();
        if (clusterSla != null) {
            return clusterSla;
        }
        Sla feedSla = feed.getSla();
        return feedSla == null ? null : feedSla;
    }

    public static Sla getSLA(String clusterName, Feed feed) {
        org.apache.falcon.entity.v0.feed.Cluster cluster = FeedHelper.getCluster(feed, clusterName);
        return cluster != null ? FeedHelper.getSLA(cluster, feed) : null;
    }

    protected static CatalogTable getTable(org.apache.falcon.entity.v0.feed.Cluster cluster, Feed feed) {
        if (cluster.getTable() != null) {
            return cluster.getTable();
        }
        return feed.getTable();
    }

    public static String normalizePartitionExpression(String part1, String part2) {
        String partExp = StringUtils.stripToEmpty((String)part1) + "/" + StringUtils.stripToEmpty((String)part2);
        partExp = partExp.replaceAll("//+", "/");
        partExp = StringUtils.stripStart((String)partExp, (String)"/");
        partExp = StringUtils.stripEnd((String)partExp, (String)"/");
        return partExp;
    }

    public static String normalizePartitionExpression(String partition) {
        return FeedHelper.normalizePartitionExpression(partition, null);
    }

    public static Properties getClusterProperties(org.apache.falcon.entity.v0.cluster.Cluster cluster) {
        Properties properties = new Properties();
        HashMap<String, String> clusterVars = new HashMap<String, String>();
        clusterVars.put("colo", cluster.getColo());
        clusterVars.put("name", cluster.getName());
        if (cluster.getProperties() != null) {
            for (Property property : cluster.getProperties().getProperties()) {
                clusterVars.put(property.getName(), property.getValue());
            }
        }
        properties.put("cluster", clusterVars);
        return properties;
    }

    public static String evaluateClusterExp(org.apache.falcon.entity.v0.cluster.Cluster clusterEntity, String exp) throws FalconException {
        Properties properties = FeedHelper.getClusterProperties(clusterEntity);
        ExpressionHelper expHelp = ExpressionHelper.get();
        expHelp.setPropertiesForVariable(properties);
        return expHelp.evaluateFullExpression(exp, String.class);
    }

    public static String getStagingPath(boolean isSource, org.apache.falcon.entity.v0.cluster.Cluster clusterEntity, Feed feed, CatalogStorage storage, Tag tag, String suffix) {
        String stagingDirPath = FeedHelper.getStagingDir(isSource, clusterEntity, feed, storage, tag);
        String datedPartitionKey = storage.getDatedPartitionKeys().get(0);
        String datedPartitionKeySuffix = datedPartitionKey + "=${coord:dataOutPartitionValue('output'," + "'" + datedPartitionKey + "')}";
        return stagingDirPath + "/" + datedPartitionKeySuffix + "/" + suffix + "/" + "data";
    }

    public static String getStagingDir(boolean isSource, org.apache.falcon.entity.v0.cluster.Cluster clusterEntity, Feed feed, CatalogStorage storage, Tag tag) {
        String workflowName = EntityUtil.getWorkflowName(tag, Arrays.asList(clusterEntity.getName()), (Entity)feed).toString();
        String storageUri = isSource ? ClusterHelper.getReadOnlyStorageUrl(clusterEntity) : ClusterHelper.getStorageUrl(clusterEntity);
        return storageUri + EntityUtil.getLogPath(clusterEntity, (Entity)feed) + "/" + workflowName + "/" + storage.getDatabase() + "/" + storage.getTable();
    }

    public static Properties getUserWorkflowProperties(LifeCycle lifeCycle) {
        String version;
        Properties props = new Properties();
        props.put("userWorkflowName", lifeCycle.name().toLowerCase() + "-policy");
        props.put("userWorkflowEngine", "falcon");
        try {
            version = BuildProperties.get().getProperty("build.version");
        }
        catch (Exception e) {
            version = "0.6";
        }
        props.put("userWorkflowVersion", version);
        return props;
    }

    public static Properties getFeedProperties(Feed feed) {
        Properties feedProperties = new Properties();
        if (feed.getProperties() != null) {
            for (org.apache.falcon.entity.v0.feed.Property property : feed.getProperties().getProperties()) {
                feedProperties.put(property.getName(), property.getValue());
            }
        }
        return feedProperties;
    }

    public static Lifecycle getLifecycle(Feed feed, String clusterName) throws FalconException {
        org.apache.falcon.entity.v0.feed.Cluster cluster = FeedHelper.getCluster(feed, clusterName);
        if (cluster != null) {
            return cluster.getLifecycle() != null ? cluster.getLifecycle() : feed.getLifecycle();
        }
        throw new FalconException("Cluster: " + clusterName + " isn't valid for feed: " + feed.getName());
    }

    public static RetentionStage getRetentionStage(Feed feed, String clusterName) throws FalconException {
        if (FeedHelper.isLifecycleEnabled(feed, clusterName)) {
            Lifecycle globalLifecycle = feed.getLifecycle();
            Lifecycle clusterLifecycle = FeedHelper.getCluster(feed, clusterName).getLifecycle();
            if (clusterLifecycle != null && clusterLifecycle.getRetentionStage() != null) {
                return clusterLifecycle.getRetentionStage();
            }
            if (globalLifecycle != null) {
                return globalLifecycle.getRetentionStage();
            }
        }
        return null;
    }

    public static List<String> getPolicies(Feed feed, String clusterName) throws FalconException {
        ArrayList<String> result = new ArrayList<String>();
        org.apache.falcon.entity.v0.feed.Cluster cluster = FeedHelper.getCluster(feed, clusterName);
        if (cluster != null) {
            if (FeedHelper.isLifecycleEnabled(feed, clusterName)) {
                String policy = FeedHelper.getRetentionStage(feed, clusterName).getPolicy();
                policy = StringUtils.isBlank((CharSequence)policy) ? FeedLifecycleStage.RETENTION.getDefaultPolicyName() : policy;
                result.add(policy);
            }
            return result;
        }
        throw new FalconException("Cluster: " + clusterName + " isn't valid for feed: " + feed.getName());
    }

    public static Date getDate(String templatePath, Path instancePath, TimeZone timeZone) {
        String path = instancePath.toString();
        Matcher matcher = FeedDataPath.PATTERN.matcher(templatePath);
        Calendar cal = Calendar.getInstance(timeZone);
        int lastEnd = 0;
        HashSet<FeedDataPath.VARS> matchedVars = new HashSet<FeedDataPath.VARS>();
        while (matcher.find()) {
            int value;
            FeedDataPath.VARS pathVar = FeedDataPath.VARS.from(matcher.group());
            String pad = templatePath.substring(lastEnd, matcher.start());
            if (!path.startsWith(pad)) {
                return null;
            }
            try {
                value = Integer.valueOf(path.substring(pad.length(), pad.length() + pathVar.getValueSize()));
            }
            catch (NumberFormatException e) {
                return null;
            }
            pathVar.setCalendar(cal, value);
            lastEnd = matcher.end();
            path = path.substring(pad.length() + pathVar.getValueSize());
            matchedVars.add(pathVar);
        }
        String remTemplatePath = templatePath.substring(lastEnd);
        if (StringUtils.isNotEmpty((CharSequence)path) && StringUtils.isNotEmpty((CharSequence)remTemplatePath) && !path.contains(remTemplatePath)) {
            return null;
        }
        for (FeedDataPath.VARS var : FeedDataPath.VARS.values()) {
            if (matchedVars.contains((Object)var)) continue;
            cal.set(var.getCalendarField(), 0);
        }
        return cal.getTime();
    }

    public static Path getFeedBasePath(String feedPath) throws IOException {
        Matcher matcher = FeedDataPath.PATTERN.matcher(feedPath);
        if (matcher.find()) {
            return new Path(feedPath.substring(0, matcher.start()));
        }
        throw new IOException("Unable to resolve pattern for feedPath: " + feedPath);
    }

    private static void validateFeedInstance(Feed feed, Date instanceTime, org.apache.falcon.entity.v0.cluster.Cluster cluster) {
        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
        if (feedCluster == null) {
            throw new IllegalArgumentException("Cluster :" + cluster.getName() + " is not a valid cluster for feed:" + feed.getName());
        }
        if (feedCluster.getValidity().getStart().after(instanceTime) || !feedCluster.getValidity().getEnd().after(instanceTime)) {
            throw new IllegalArgumentException("instanceTime: " + instanceTime + " is not in validity range for" + " Feed: " + feed.getName() + " on cluster:" + cluster.getName());
        }
        Date nextInstance = EntityUtil.getNextStartTime(feedCluster.getValidity().getStart(), feed.getFrequency(), feed.getTimezone(), instanceTime);
        if (!nextInstance.equals(instanceTime)) {
            throw new IllegalArgumentException("instanceTime: " + instanceTime + " is not a valid instance for the " + " feed: " + feed.getName() + " on cluster: " + cluster.getName() + " on the basis of startDate and frequency");
        }
    }

    public static SchedulableEntityInstance getProducerInstance(Feed feed, Date feedInstanceTime, org.apache.falcon.entity.v0.cluster.Cluster cluster) throws FalconException {
        FeedHelper.validateFeedInstance(feed, feedInstanceTime, cluster);
        Process process = FeedHelper.getProducerProcess(feed);
        if (process != null) {
            Cluster processCluster = ProcessHelper.getCluster(process, cluster.getName());
            Date pStart = processCluster.getValidity().getStart();
            Date pEnd = processCluster.getValidity().getEnd();
            Frequency pFrequency = process.getFrequency();
            TimeZone pTz = process.getTimezone();
            try {
                Date processInstanceTime = FeedHelper.getProducerInstanceTime(feed, feedInstanceTime, process, cluster);
                boolean isValid = EntityUtil.isValidInstanceTime(pStart, pFrequency, pTz, processInstanceTime);
                if (processInstanceTime.before(pStart) || !processInstanceTime.before(pEnd) || !isValid) {
                    return null;
                }
                SchedulableEntityInstance producer = new SchedulableEntityInstance(process.getName(), cluster.getName(), processInstanceTime, EntityType.PROCESS);
                producer.setTags("Output");
                return producer;
            }
            catch (IllegalArgumentException | FalconException e) {
                LOG.error("Error in trying to get producer process: {}'s instance time for feed: {}'s instance: }  on cluster:{}", new Object[]{process.getName(), feed.getName(), feedInstanceTime, cluster.getName()});
            }
        }
        return null;
    }

    public static Process getProducerProcess(Feed feed) throws FalconException {
        EntityList dependencies = EntityUtil.getEntityDependencies((Entity)feed);
        for (EntityList.EntityElement e : dependencies.getElements()) {
            if (!e.tag.contains("Output")) continue;
            return (Process)EntityUtil.getEntity(EntityType.PROCESS, e.name);
        }
        return null;
    }

    private static Date getProducerInstanceTime(Feed feed, Date feedInstanceTime, Process producer, org.apache.falcon.entity.v0.cluster.Cluster cluster) throws FalconException {
        String clusterName = cluster.getName();
        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, clusterName);
        Cluster processCluster = ProcessHelper.getCluster(producer, clusterName);
        Date producerStartDate = processCluster.getValidity().getStart();
        String outputInstance = null;
        for (Output op : producer.getOutputs().getOutputs()) {
            if (!StringUtils.equals((CharSequence)feed.getName(), (CharSequence)op.getFeed())) continue;
            outputInstance = op.getInstance();
        }
        ExpressionHelper.setReferenceDate(producerStartDate);
        ExpressionHelper evaluator = ExpressionHelper.get();
        Date relativeFeedInstance = evaluator.evaluate(outputInstance, Date.class);
        Date feedInstanceActual = EntityUtil.getPreviousInstanceTime(feedCluster.getValidity().getStart(), feed.getFrequency(), feed.getTimezone(), relativeFeedInstance);
        Long producerInstanceTime = feedInstanceTime.getTime() + (producerStartDate.getTime() - feedInstanceActual.getTime());
        Date producerInstance = new Date(producerInstanceTime);
        if (producerInstance.before(processCluster.getValidity().getStart()) || producerInstance.after(processCluster.getValidity().getEnd())) {
            throw new IllegalArgumentException("Instance time provided: " + feedInstanceTime + " for feed " + feed.getName() + " is outside the range of instances produced by the producer process: " + producer.getName() + " in it's validity range on provided cluster: " + cluster.getName());
        }
        return producerInstance;
    }

    public static Set<SchedulableEntityInstance> getConsumerInstances(Feed feed, Date feedInstanceTime, org.apache.falcon.entity.v0.cluster.Cluster cluster) throws FalconException {
        HashSet<SchedulableEntityInstance> result = new HashSet<SchedulableEntityInstance>();
        FeedHelper.validateFeedInstance(feed, feedInstanceTime, cluster);
        Set<Process> consumers = FeedHelper.getConsumerProcesses(feed);
        for (Process p : consumers) {
            Set<Date> consumerInstanceTimes = FeedHelper.getConsumerProcessInstanceTimes(feed, feedInstanceTime, p, cluster);
            for (Date date : consumerInstanceTimes) {
                SchedulableEntityInstance in = new SchedulableEntityInstance(p.getName(), cluster.getName(), date, EntityType.PROCESS);
                in.setTags("Input");
                result.add(in);
            }
        }
        return result;
    }

    public static Set<Process> getConsumerProcesses(Feed feed) throws FalconException {
        HashSet<Process> result = new HashSet<Process>();
        EntityList dependencies = EntityUtil.getEntityDependencies((Entity)feed);
        for (EntityList.EntityElement e : dependencies.getElements()) {
            if (!e.tag.contains("Input")) continue;
            Process consumer = (Process)EntityUtil.getEntity(EntityType.PROCESS, e.name);
            result.add(consumer);
        }
        return result;
    }

    private static Set<Date> getConsumerProcessInstanceTimes(Feed feed, Date feedInstancetime, Process consumer, org.apache.falcon.entity.v0.cluster.Cluster cluster) throws FalconException {
        HashSet<Date> result = new HashSet<Date>();
        Cluster processCluster = ProcessHelper.getCluster(consumer, cluster.getName());
        if (processCluster == null) {
            throw new IllegalArgumentException("Cluster is not valid for process");
        }
        Date processStartDate = processCluster.getValidity().getStart();
        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
        Date feedStartDate = feedCluster.getValidity().getStart();
        ArrayList<Input> inputFeeds = new ArrayList<Input>();
        if (consumer.getInputs() != null && consumer.getInputs().getInputs() != null) {
            for (Input input : consumer.getInputs().getInputs()) {
                if (!StringUtils.equals((CharSequence)input.getFeed(), (CharSequence)feed.getName())) continue;
                inputFeeds.add(input);
            }
        }
        block1: for (Input in : inputFeeds) {
            Long rangeEnd;
            Long rangeStart;
            Date inputStart;
            Date nextConsumerInstance;
            Date processInstanceStartActual;
            ExpressionHelper.setReferenceDate(processStartDate);
            ExpressionHelper evaluator = ExpressionHelper.get();
            Date startRelative = evaluator.evaluate(in.getStart(), Date.class);
            Date startTimeActual = EntityUtil.getPreviousInstanceTime(feedStartDate, feed.getFrequency(), feed.getTimezone(), startRelative);
            Long offset = processStartDate.getTime() - startTimeActual.getTime();
            Date processInstanceStartRelative = new Date(feedInstancetime.getTime() + offset);
            Date currentInstance = processInstanceStartActual = EntityUtil.getPreviousInstanceTime(processStartDate, consumer.getFrequency(), consumer.getTimezone(), processInstanceStartRelative);
            while (true) {
                nextConsumerInstance = EntityUtil.getNextStartTime(processStartDate, consumer.getFrequency(), consumer.getTimezone(), currentInstance);
                ExpressionHelper.setReferenceDate(nextConsumerInstance);
                evaluator = ExpressionHelper.get();
                inputStart = evaluator.evaluate(in.getStart(), Date.class);
                rangeStart = EntityUtil.getPreviousInstanceTime(feedStartDate, feed.getFrequency(), feed.getTimezone(), inputStart).getTime();
                rangeEnd = evaluator.evaluate(in.getEnd(), Date.class).getTime();
                if (rangeStart > feedInstancetime.getTime() || feedInstancetime.getTime() > rangeEnd) break;
                if (!nextConsumerInstance.before(processCluster.getValidity().getStart()) && nextConsumerInstance.before(processCluster.getValidity().getEnd())) {
                    result.add(nextConsumerInstance);
                }
                currentInstance = new Date(nextConsumerInstance.getTime() + 1L);
            }
            currentInstance = processInstanceStartActual;
            while (true) {
                nextConsumerInstance = EntityUtil.getPreviousInstanceTime(processStartDate, consumer.getFrequency(), consumer.getTimezone(), currentInstance);
                ExpressionHelper.setReferenceDate(nextConsumerInstance);
                evaluator = ExpressionHelper.get();
                inputStart = evaluator.evaluate(in.getStart(), Date.class);
                rangeStart = EntityUtil.getPreviousInstanceTime(feedStartDate, feed.getFrequency(), feed.getTimezone(), inputStart).getTime();
                rangeEnd = evaluator.evaluate(in.getEnd(), Date.class).getTime();
                if (rangeStart > feedInstancetime.getTime() || feedInstancetime.getTime() > rangeEnd) continue block1;
                if (!nextConsumerInstance.before(processCluster.getValidity().getStart()) && nextConsumerInstance.before(processCluster.getValidity().getEnd())) {
                    result.add(nextConsumerInstance);
                }
                currentInstance = new Date(nextConsumerInstance.getTime() - 1L);
            }
        }
        return result;
    }

    public static FeedInstanceResult getFeedInstanceListing(Entity entityObject, Date start, Date end) throws FalconException {
        Set<String> clusters = EntityUtil.getClustersDefinedInColos(entityObject);
        FeedInstanceResult result = new FeedInstanceResult(APIResult.Status.SUCCEEDED, "Success");
        for (String cluster : clusters) {
            Feed feed = (Feed)entityObject;
            Storage storage = FeedHelper.createStorage(cluster, feed);
            List<FeedInstanceStatus> feedListing = storage.getListing(feed, cluster, LocationType.DATA, start, end);
            FeedInstanceResult.Instance[] instances = new FeedInstanceResult.Instance[feedListing.size()];
            int index = 0;
            for (FeedInstanceStatus feedStatus : feedListing) {
                FeedInstanceResult.Instance instance = new FeedInstanceResult.Instance(cluster, feedStatus.getInstance(), feedStatus.getStatus().name());
                instance.creationTime = feedStatus.getCreationTime();
                instance.uri = feedStatus.getUri();
                instance.size = feedStatus.getSize();
                instance.sizeH = feedStatus.getSizeH();
                instances[index++] = instance;
            }
            result.setInstances(instances);
        }
        return result;
    }

    public static FeedInstanceStatus.AvailabilityStatus getFeedInstanceStatus(Feed feed, String clusterName, Date instanceTime) throws FalconException {
        Storage storage = FeedHelper.createStorage(clusterName, feed);
        return storage.getInstanceAvailabilityStatus(feed, clusterName, LocationType.DATA, instanceTime);
    }

    public static boolean isLifecycleEnabled(Feed feed, String clusterName) {
        org.apache.falcon.entity.v0.feed.Cluster cluster = FeedHelper.getCluster(feed, clusterName);
        return cluster != null && (feed.getLifecycle() != null || cluster.getLifecycle() != null);
    }

    public static Frequency getRetentionFrequency(Feed feed, String clusterName) throws FalconException {
        Frequency retentionFrequency;
        RetentionStage retentionStage = FeedHelper.getRetentionStage(feed, clusterName);
        if (retentionStage != null && retentionStage.getFrequency() != null) {
            retentionFrequency = retentionStage.getFrequency();
        } else {
            Frequency feedFrequency = feed.getFrequency();
            Frequency defaultFrequency = new Frequency("hours(6)");
            retentionFrequency = DateUtil.getFrequencyInMillis(feedFrequency) < DateUtil.getFrequencyInMillis(defaultFrequency) ? defaultFrequency : new Frequency(feedFrequency.toString());
        }
        return retentionFrequency;
    }
}

