/*
 * Decompiled with CFR 0.152.
 */
package org.datacleaner.spark;

import java.util.Collections;
import java.util.List;
import org.apache.metamodel.csv.CsvConfiguration;
import org.apache.metamodel.util.Resource;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.datacleaner.api.AnalyzerResult;
import org.datacleaner.api.InputRow;
import org.datacleaner.connection.CsvDatastore;
import org.datacleaner.connection.Datastore;
import org.datacleaner.connection.JsonDatastore;
import org.datacleaner.job.AnalysisJob;
import org.datacleaner.job.runner.AnalysisResultFuture;
import org.datacleaner.job.runner.AnalysisRunner;
import org.datacleaner.spark.SkipHeaderLineFunction;
import org.datacleaner.spark.SparkAnalysisResultFuture;
import org.datacleaner.spark.SparkJobContext;
import org.datacleaner.spark.functions.AnalyzerResultReduceFunction;
import org.datacleaner.spark.functions.CsvParserFunction;
import org.datacleaner.spark.functions.ExtractAnalyzerResultFunction;
import org.datacleaner.spark.functions.JsonParserFunction;
import org.datacleaner.spark.functions.RowProcessingFunction;
import org.datacleaner.spark.functions.TuplesToTuplesFunction;
import org.datacleaner.spark.functions.ValuesToInputRowFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class SparkAnalysisRunner
implements AnalysisRunner {
    private static final Logger logger = LoggerFactory.getLogger(SparkAnalysisRunner.class);
    private final SparkJobContext _sparkJobContext;
    private final JavaSparkContext _sparkContext;
    private final Integer _minPartitions;

    public SparkAnalysisRunner(JavaSparkContext sparkContext, SparkJobContext sparkJobContext) {
        this(sparkContext, sparkJobContext, null);
    }

    public SparkAnalysisRunner(JavaSparkContext sparkContext, SparkJobContext sparkJobContext, Integer minPartitions) {
        this._sparkContext = sparkContext;
        this._sparkJobContext = sparkJobContext;
        if (minPartitions != null) {
            if (minPartitions > 0) {
                this._minPartitions = minPartitions;
            } else {
                logger.warn("Minimum number of partitions needs to be a positive number, but specified: {}. Disregarding the value and inferring the number of partitions automatically", (Object)minPartitions);
                this._minPartitions = null;
            }
        } else {
            this._minPartitions = null;
        }
    }

    public AnalysisResultFuture run(AnalysisJob job) {
        return this.run();
    }

    public AnalysisResultFuture run() {
        JavaPairRDD namedAnalyzerResultsRDD;
        this._sparkJobContext.triggerOnJobStart();
        AnalysisJob analysisJob = this._sparkJobContext.getAnalysisJob();
        Datastore datastore = analysisJob.getDatastore();
        JavaRDD<InputRow> inputRowsRDD = this.openSourceDatastore(datastore);
        if (this._sparkJobContext.getAnalysisJobBuilder().isDistributable()) {
            logger.info("Running the job in distributed mode");
            boolean preservePartitions = true;
            JavaRDD processedTuplesRdd = inputRowsRDD.mapPartitionsWithIndex((Function2)new RowProcessingFunction(this._sparkJobContext), true);
            if (this._sparkJobContext.isResultEnabled()) {
                JavaPairRDD partialNamedAnalyzerResultsRDD = processedTuplesRdd.mapPartitionsToPair(new TuplesToTuplesFunction(), true);
                namedAnalyzerResultsRDD = partialNamedAnalyzerResultsRDD.reduceByKey((Function2)new AnalyzerResultReduceFunction(this._sparkJobContext));
            } else {
                processedTuplesRdd.count();
                namedAnalyzerResultsRDD = null;
            }
        } else {
            logger.warn("Running the job in non-distributed mode");
            JavaRDD coalescedInputRowsRDD = inputRowsRDD.coalesce(1);
            namedAnalyzerResultsRDD = coalescedInputRowsRDD.mapPartitionsToPair((PairFlatMapFunction)new RowProcessingFunction(this._sparkJobContext));
            if (!this._sparkJobContext.isResultEnabled()) {
                namedAnalyzerResultsRDD.count();
            }
        }
        if (!this._sparkJobContext.isResultEnabled()) {
            List<Tuple2<String, AnalyzerResult>> results = Collections.emptyList();
            return new SparkAnalysisResultFuture(results, this._sparkJobContext);
        }
        JavaPairRDD finalAnalyzerResultsRDD = namedAnalyzerResultsRDD.mapValues((Function)new ExtractAnalyzerResultFunction());
        List results = finalAnalyzerResultsRDD.collect();
        logger.info("Finished! Number of AnalyzerResult objects: {}", (Object)results.size());
        for (Tuple2 analyzerResultTuple : results) {
            String key = (String)analyzerResultTuple._1;
            AnalyzerResult result = (AnalyzerResult)analyzerResultTuple._2;
            logger.info("AnalyzerResult (" + key + "):\n\n" + result + "\n");
        }
        this._sparkJobContext.triggerOnJobEnd();
        return new SparkAnalysisResultFuture(results, this._sparkJobContext);
    }

    private JavaRDD<InputRow> openSourceDatastore(Datastore datastore) {
        if (datastore instanceof CsvDatastore) {
            CsvDatastore csvDatastore = (CsvDatastore)datastore;
            Resource resource = csvDatastore.getResource();
            assert (resource != null);
            String datastorePath = resource.getQualifiedPath();
            CsvConfiguration csvConfiguration = csvDatastore.getCsvConfiguration();
            JavaRDD rawInput = this._minPartitions != null ? this._sparkContext.textFile(datastorePath, this._minPartitions.intValue()) : this._sparkContext.textFile(datastorePath);
            JavaRDD parsedInput = rawInput.map((Function)new CsvParserFunction(csvConfiguration));
            JavaPairRDD zipWithIndex = parsedInput.zipWithIndex();
            if (csvConfiguration.getColumnNameLineNumber() != 0) {
                zipWithIndex = zipWithIndex.filter((Function)new SkipHeaderLineFunction(csvConfiguration.getColumnNameLineNumber()));
            }
            JavaRDD inputRowsRDD = zipWithIndex.map((Function)new ValuesToInputRowFunction(this._sparkJobContext));
            return inputRowsRDD;
        }
        if (datastore instanceof JsonDatastore) {
            JsonDatastore jsonDatastore = (JsonDatastore)datastore;
            String datastorePath = jsonDatastore.getResource().getQualifiedPath();
            JavaRDD rawInput = this._minPartitions != null ? this._sparkContext.textFile(datastorePath, this._minPartitions.intValue()) : this._sparkContext.textFile(datastorePath);
            JavaRDD parsedInput = rawInput.map((Function)new JsonParserFunction(jsonDatastore));
            JavaPairRDD zipWithIndex = parsedInput.zipWithIndex();
            JavaRDD inputRowsRDD = zipWithIndex.map((Function)new ValuesToInputRowFunction(this._sparkJobContext));
            return inputRowsRDD;
        }
        throw new UnsupportedOperationException("Unsupported datastore type or configuration: " + datastore);
    }
}

