/*
 * Decompiled with CFR 0.152.
 */
package org.datacleaner.spark.functions;

import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Set;
import org.apache.metamodel.csv.CsvConfiguration;
import org.apache.metamodel.util.FileResource;
import org.apache.metamodel.util.HdfsResource;
import org.apache.metamodel.util.Resource;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.datacleaner.api.AnalyzerResult;
import org.datacleaner.api.AnalyzerResultFuture;
import org.datacleaner.api.HasAnalyzerResult;
import org.datacleaner.api.InputRow;
import org.datacleaner.configuration.DataCleanerConfiguration;
import org.datacleaner.connection.CsvDatastore;
import org.datacleaner.connection.JsonDatastore;
import org.datacleaner.connection.ResourceDatastore;
import org.datacleaner.connection.UpdateableDatastore;
import org.datacleaner.descriptors.ConfiguredPropertyDescriptor;
import org.datacleaner.extension.output.CreateCsvFileAnalyzer;
import org.datacleaner.job.AnalysisJob;
import org.datacleaner.job.builder.AnalysisJobBuilder;
import org.datacleaner.job.builder.ComponentBuilder;
import org.datacleaner.job.runner.ActiveOutputDataStream;
import org.datacleaner.job.runner.ConsumeRowHandler;
import org.datacleaner.job.runner.RowProcessingConsumer;
import org.datacleaner.lifecycle.LifeCycleHelper;
import org.datacleaner.spark.NamedAnalyzerResult;
import org.datacleaner.spark.SparkJobContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public final class RowProcessingFunction
implements Function2<Integer, Iterator<InputRow>, Iterator<Tuple2<String, NamedAnalyzerResult>>>,
PairFlatMapFunction<Iterator<InputRow>, String, NamedAnalyzerResult> {
    private static final Logger logger = LoggerFactory.getLogger(RowProcessingFunction.class);
    private static final long serialVersionUID = 1L;
    private final SparkJobContext _sparkJobContext;

    public RowProcessingFunction(SparkJobContext sparkJobContext) {
        this._sparkJobContext = sparkJobContext;
    }

    public Iterable<Tuple2<String, NamedAnalyzerResult>> call(Iterator<InputRow> inputRowIterator) throws Exception {
        logger.info("call(Iterator) invoked");
        AnalysisJob analysisJob = this._sparkJobContext.getAnalysisJob();
        List<Tuple2<String, NamedAnalyzerResult>> analyzerResults = this.executePartition(inputRowIterator, analysisJob);
        logger.info("call(Iterator) finished, returning {} results", (Object)analyzerResults.size());
        return analyzerResults;
    }

    public Iterator<Tuple2<String, NamedAnalyzerResult>> call(Integer partitionNumber, Iterator<InputRow> inputRowIterator) throws Exception {
        logger.info("call({}, Iterator) invoked", (Object)partitionNumber);
        AnalysisJobBuilder jobBuilder = this._sparkJobContext.getAnalysisJobBuilder();
        this.configureComponentsBeforeBuilding(jobBuilder, partitionNumber);
        AnalysisJob analysisJob = jobBuilder.toAnalysisJob();
        List<Tuple2<String, NamedAnalyzerResult>> analyzerResults = this.executePartition(inputRowIterator, analysisJob);
        logger.info("call({}, Iterator) finished, returning {} results", (Object)partitionNumber, (Object)analyzerResults.size());
        return analyzerResults.iterator();
    }

    private void configureComponentsBeforeBuilding(AnalysisJobBuilder jobBuilder, int partitionNumber) {
        for (ComponentBuilder cb : jobBuilder.getComponentBuilders()) {
            Set targetDatastoreProperties = cb.getDescriptor().getConfiguredPropertiesByType(UpdateableDatastore.class, false);
            for (ConfiguredPropertyDescriptor targetDatastoreProperty : targetDatastoreProperties) {
                ResourceDatastore replacementDatastore;
                ResourceDatastore resourceDatastore;
                Resource resource;
                Resource replacementResource;
                Object datastoreObject = cb.getConfiguredProperty(targetDatastoreProperty);
                if (!(datastoreObject instanceof ResourceDatastore) || (replacementResource = this.createReplacementResource(resource = (resourceDatastore = (ResourceDatastore)datastoreObject).getResource(), partitionNumber)) == null || (replacementDatastore = this.createReplacementDatastore(cb, resourceDatastore, replacementResource)) == null) continue;
                cb.setConfiguredProperty(targetDatastoreProperty, (Object)replacementDatastore);
            }
            Set resourceProperties = cb.getDescriptor().getConfiguredPropertiesByType(Resource.class, false);
            for (ConfiguredPropertyDescriptor resourceProperty : resourceProperties) {
                Resource resource = (Resource)cb.getConfiguredProperty(resourceProperty);
                Resource replacementResource = this.createReplacementResource(resource, partitionNumber);
                if (replacementResource == null) continue;
                cb.setConfiguredProperty(resourceProperty, (Object)replacementResource);
            }
            if (!(cb.getComponentInstance() instanceof CreateCsvFileAnalyzer) || partitionNumber <= 0) continue;
            cb.setConfiguredProperty("Include header", (Object)false);
        }
        List children = jobBuilder.getConsumedOutputDataStreamsJobBuilders();
        for (AnalysisJobBuilder childJobBuilder : children) {
            this.configureComponentsBeforeBuilding(childJobBuilder, partitionNumber);
        }
    }

    private Resource createReplacementResource(Resource resource, int partitionNumber) {
        String formattedPartitionNumber = String.format("%05d", partitionNumber);
        if (resource instanceof HdfsResource) {
            HdfsResource hdfsResource = (HdfsResource)resource;
            HdfsResource replacementResource = new HdfsResource(hdfsResource.getQualifiedPath() + "/part-" + formattedPartitionNumber);
            return replacementResource;
        }
        if (resource instanceof FileResource) {
            File file = ((FileResource)resource).getFile();
            if (file.exists() && file.isFile()) {
                return resource;
            }
            if (!file.exists()) {
                file.mkdirs();
            }
            FileResource fileResource = new FileResource(resource.getQualifiedPath() + "/part-" + formattedPartitionNumber);
            return fileResource;
        }
        return null;
    }

    private ResourceDatastore createReplacementDatastore(ComponentBuilder cb, ResourceDatastore datastore, Resource replacementResource) {
        String name = datastore.getName();
        if (datastore instanceof CsvDatastore) {
            CsvConfiguration csvConfiguration = ((CsvDatastore)datastore).getCsvConfiguration();
            return new CsvDatastore(name, replacementResource, csvConfiguration);
        }
        if (datastore instanceof JsonDatastore) {
            return new JsonDatastore(name, replacementResource, ((JsonDatastore)datastore).getSchemaBuilder());
        }
        logger.warn("Could not replace datastore '{}' because it is of an unsupported type: ", (Object)name, (Object)datastore.getClass().getSimpleName());
        return datastore;
    }

    private List<Tuple2<String, NamedAnalyzerResult>> executePartition(Iterator<InputRow> inputRowIterator, AnalysisJob analysisJob) {
        this._sparkJobContext.triggerOnPartitionProcessingStart();
        DataCleanerConfiguration configuration = this._sparkJobContext.getConfiguration();
        ConsumeRowHandler.Configuration handlerConfiguration = new ConsumeRowHandler.Configuration();
        handlerConfiguration.includeAnalyzers = true;
        handlerConfiguration.includeNonDistributedTasks = false;
        ConsumeRowHandler consumeRowHandler = new ConsumeRowHandler(analysisJob, configuration, handlerConfiguration);
        while (inputRowIterator.hasNext()) {
            InputRow inputRow = inputRowIterator.next();
            consumeRowHandler.consumeRow(inputRow);
            logger.debug("Consumed row no. {}", (Object)inputRow.getId());
        }
        logger.info("Row processing complete - continuing to fetching results");
        List<Tuple2<String, NamedAnalyzerResult>> analyzerResults = this.getAnalyzerResults(consumeRowHandler.getConsumers());
        ListIterator<Tuple2<String, NamedAnalyzerResult>> it = analyzerResults.listIterator();
        while (it.hasNext()) {
            Tuple2<String, NamedAnalyzerResult> tuple = it.next();
            NamedAnalyzerResult namedAnalyzerResult = (NamedAnalyzerResult)tuple._2;
            AnalyzerResult analyzerResult = namedAnalyzerResult.getAnalyzerResult();
            if (!(analyzerResult instanceof AnalyzerResultFuture)) continue;
            AnalyzerResult awaitedResult = ((AnalyzerResultFuture)analyzerResult).get();
            NamedAnalyzerResult awaitedResultTuple = new NamedAnalyzerResult(namedAnalyzerResult.getName(), awaitedResult);
            it.set((Tuple2<String, NamedAnalyzerResult>)new Tuple2(tuple._1, (Object)awaitedResultTuple));
        }
        LifeCycleHelper lifeCycleHelper = new LifeCycleHelper(configuration, analysisJob, false);
        for (RowProcessingConsumer consumer : consumeRowHandler.getConsumers()) {
            lifeCycleHelper.close(consumer.getComponentJob().getDescriptor(), consumer.getComponent(), true);
        }
        this._sparkJobContext.triggerOnPartitionProcessingEnd();
        return analyzerResults;
    }

    private List<Tuple2<String, NamedAnalyzerResult>> getAnalyzerResults(Collection<RowProcessingConsumer> rowProcessingConsumers) {
        ArrayList<Tuple2<String, NamedAnalyzerResult>> analyzerResults = new ArrayList<Tuple2<String, NamedAnalyzerResult>>();
        for (RowProcessingConsumer consumer : rowProcessingConsumers) {
            if (consumer.isResultProducer()) {
                HasAnalyzerResult resultProducer = (HasAnalyzerResult)consumer.getComponent();
                AnalyzerResult analyzerResult = resultProducer.getResult();
                String key = this._sparkJobContext.getComponentKey(consumer.getComponentJob());
                NamedAnalyzerResult namedAnalyzerResult = new NamedAnalyzerResult(key, analyzerResult);
                Tuple2 tuple = new Tuple2((Object)key, (Object)namedAnalyzerResult);
                analyzerResults.add((Tuple2<String, NamedAnalyzerResult>)tuple);
            }
            for (ActiveOutputDataStream activeOutputDataStream : consumer.getActiveOutputDataStreams()) {
                List outputDataStreamConsumers = activeOutputDataStream.getPublisher().getConsumers();
                List<Tuple2<String, NamedAnalyzerResult>> outputDataStreamsAnalyzerResults = this.getAnalyzerResults(outputDataStreamConsumers);
                analyzerResults.addAll(outputDataStreamsAnalyzerResults);
            }
        }
        return analyzerResults;
    }
}

