/*
 * Decompiled with CFR 0.152.
 */
package io.openlineage.spark.agent.lifecycle;

import io.openlineage.client.OpenLineage;
import io.openlineage.spark.agent.EventEmitter;
import io.openlineage.spark.agent.OpenLineageSparkListener;
import io.openlineage.spark.agent.Versions;
import io.openlineage.spark.agent.facets.ErrorFacet;
import io.openlineage.spark.agent.facets.SparkVersionFacet;
import io.openlineage.spark.agent.facets.builder.SparkProcessingEngineRunFacetBuilderDelegate;
import io.openlineage.spark.agent.lifecycle.DatasetParser;
import io.openlineage.spark.agent.lifecycle.ExecutionContext;
import io.openlineage.spark.agent.lifecycle.Rdds;
import io.openlineage.spark.agent.util.PlanUtils;
import io.openlineage.spark.agent.util.ScalaConversionUtils;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URI;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.parquet.Strings;
import org.apache.spark.Dependency;
import org.apache.spark.SparkContext;
import org.apache.spark.SparkContext$;
import org.apache.spark.TaskContext;
import org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil;
import org.apache.spark.internal.io.HadoopMapReduceWriteConfigUtil;
import org.apache.spark.rdd.HadoopRDD;
import org.apache.spark.rdd.MapPartitionsRDD;
import org.apache.spark.rdd.NewHadoopRDD;
import org.apache.spark.rdd.RDD;
import org.apache.spark.scheduler.ActiveJob;
import org.apache.spark.scheduler.JobFailed;
import org.apache.spark.scheduler.JobResult;
import org.apache.spark.scheduler.ResultStage;
import org.apache.spark.scheduler.SparkListenerJobEnd;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.scheduler.SparkListenerStageCompleted;
import org.apache.spark.scheduler.SparkListenerStageSubmitted;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart;
import org.apache.spark.util.SerializableJobConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function0;
import scala.Function2;
import scala.collection.Iterable;
import scala.collection.Iterator;
import scala.collection.JavaConversions;
import scala.collection.Seq;
import scala.runtime.AbstractFunction0;

class RddExecutionContext
implements ExecutionContext {
    private static final Logger log = LoggerFactory.getLogger(RddExecutionContext.class);
    private final EventEmitter eventEmitter;
    private final Optional<SparkContext> sparkContextOption;
    private final UUID runId = UUID.randomUUID();
    private List<URI> inputs = Collections.emptyList();
    private List<URI> outputs = Collections.emptyList();
    private String jobSuffix;

    public RddExecutionContext(EventEmitter eventEmitter) {
        this.eventEmitter = eventEmitter;
        this.sparkContextOption = Optional.ofNullable(SparkContext$.MODULE$.getActive().getOrElse((Function0)new AbstractFunction0<SparkContext>(){

            public SparkContext apply() {
                return null;
            }
        }));
    }

    @Override
    public void start(SparkListenerStageSubmitted stageSubmitted) {
    }

    @Override
    public void end(SparkListenerStageCompleted stageCompleted) {
    }

    @Override
    public void setActiveJob(ActiveJob activeJob) {
        RDD finalRDD = activeJob.finalStage().rdd();
        this.jobSuffix = RddExecutionContext.nameRDD(finalRDD);
        Set<RDD<?>> rdds = Rdds.flattenRDDs(finalRDD);
        this.inputs = this.findInputs(rdds);
        JobConf jc = new JobConf();
        if (activeJob.finalStage() instanceof ResultStage) {
            Function2 fn = ((ResultStage)activeJob.finalStage()).func();
            try {
                Field f = this.getConfigField(fn);
                f.setAccessible(true);
                Object conf = f.get(fn);
                if (conf instanceof HadoopMapRedWriteConfigUtil) {
                    Field confField = HadoopMapRedWriteConfigUtil.class.getDeclaredField("conf");
                    confField.setAccessible(true);
                    SerializableJobConf serializableJobConf = (SerializableJobConf)confField.get(conf);
                    jc = serializableJobConf.value();
                } else if (conf instanceof HadoopMapReduceWriteConfigUtil) {
                    Field confField = HadoopMapReduceWriteConfigUtil.class.getDeclaredField("conf");
                    confField.setAccessible(true);
                    SerializableJobConf serializableJobConf = (SerializableJobConf)confField.get(conf);
                    jc = serializableJobConf.value();
                } else {
                    log.info("Config field is not HadoopMapRedWriteConfigUtil or HadoopMapReduceWriteConfigUtil, it's {}", (Object)conf.getClass().getCanonicalName());
                }
            }
            catch (IllegalAccessException | NoSuchFieldException nfe) {
                log.warn("Unable to access job conf from RDD", (Throwable)nfe);
            }
            log.info("Found job conf from RDD {}", (Object)jc);
        } else {
            jc = OpenLineageSparkListener.getConfigForRDD(finalRDD);
        }
        this.outputs = this.findOutputs(finalRDD, (Configuration)jc);
    }

    private Field getConfigField(Function2<TaskContext, Iterator<?>, ?> fn) throws NoSuchFieldException {
        try {
            return fn.getClass().getDeclaredField("config$1");
        }
        catch (NoSuchFieldException e) {
            return fn.getClass().getDeclaredField("arg$1");
        }
    }

    static String nameRDD(RDD<?> rdd) {
        Seq deps;
        List<Dependency> dependencies;
        String rddName = rdd.name();
        if (rddName == null || rdd instanceof HadoopRDD && Arrays.stream(org.apache.hadoop.mapred.FileInputFormat.getInputPaths((JobConf)((HadoopRDD)rdd).getJobConf())).anyMatch(p -> p.toString().contains(rdd.name())) || rdd instanceof MapPartitionsRDD && rdd.name().equals(((MapPartitionsRDD)rdd).prev().name())) {
            rddName = rdd.getClass().getSimpleName().replaceAll("RDD\\d*$", "").replaceAll("[\\s\\-_]?((?<=.)[A-Z](?=[a-z\\s\\-_])|(?<=[^A-Z])[A-Z]|((?<=[\\s\\-_])[a-z\\d]))", "_$1").toLowerCase(Locale.ROOT);
        }
        if ((dependencies = ScalaConversionUtils.fromSeq(deps = rdd.dependencies())).isEmpty()) {
            return rddName;
        }
        ArrayList<String> dependencyNames = new ArrayList<String>();
        for (Dependency d : dependencies) {
            dependencyNames.add(RddExecutionContext.nameRDD(d.rdd()));
        }
        String dependencyName = Strings.join(dependencyNames, (String)"_");
        if (!dependencyName.startsWith(rddName)) {
            return rddName + "_" + dependencyName;
        }
        return dependencyName;
    }

    @Override
    public void start(SparkListenerSQLExecutionStart sqlStart) {
    }

    @Override
    public void end(SparkListenerSQLExecutionEnd sqlEnd) {
    }

    @Override
    public void start(SparkListenerJobStart jobStart) {
        if (this.inputs.isEmpty() && this.outputs.isEmpty()) {
            log.info("RDDs are empty: skipping sending OpenLineage event");
            return;
        }
        OpenLineage ol = new OpenLineage(Versions.OPEN_LINEAGE_PRODUCER_URI);
        OpenLineage.RunEvent event = ol.newRunEventBuilder().eventTime(this.toZonedTime(jobStart.time())).eventType(OpenLineage.RunEvent.EventType.START).inputs(this.buildInputs(this.inputs)).outputs(this.buildOutputs(this.outputs)).run(ol.newRunBuilder().runId(this.runId).facets(this.buildRunFacets(null, ol)).build()).job(this.buildJob(jobStart.jobId())).build();
        log.debug("Posting event for start {}: {}", (Object)jobStart, (Object)event);
        this.eventEmitter.emit(event);
    }

    @Override
    public void end(SparkListenerJobEnd jobEnd) {
        if (this.inputs.isEmpty() && this.outputs.isEmpty() && !(jobEnd.jobResult() instanceof JobFailed)) {
            log.info("RDDs are empty: skipping sending OpenLineage event");
            return;
        }
        OpenLineage ol = new OpenLineage(Versions.OPEN_LINEAGE_PRODUCER_URI);
        OpenLineage.RunEvent event = ol.newRunEventBuilder().eventTime(this.toZonedTime(jobEnd.time())).eventType(this.getEventType(jobEnd.jobResult())).inputs(this.buildInputs(this.inputs)).outputs(this.buildOutputs(this.outputs)).run(ol.newRunBuilder().runId(this.runId).facets(this.buildRunFacets(this.buildJobErrorFacet(jobEnd.jobResult()), ol)).build()).job(this.buildJob(jobEnd.jobId())).build();
        log.debug("Posting event for end {}: {}", (Object)jobEnd, (Object)event);
        this.eventEmitter.emit(event);
    }

    protected ZonedDateTime toZonedTime(long time) {
        Instant i = Instant.ofEpochMilli(time);
        return ZonedDateTime.ofInstant(i, ZoneOffset.UTC);
    }

    protected OpenLineage.RunFacets buildRunFacets(ErrorFacet jobError, OpenLineage ol) {
        OpenLineage.RunFacetsBuilder runFacetsBuilder = ol.newRunFacetsBuilder();
        this.buildParentFacet().ifPresent(runFacetsBuilder::parent);
        if (jobError != null) {
            runFacetsBuilder.put("spark.exception", jobError);
        }
        this.addSparkVersionFacet(runFacetsBuilder);
        this.addProcessingEventFacet(runFacetsBuilder, ol);
        return runFacetsBuilder.build();
    }

    private void addSparkVersionFacet(OpenLineage.RunFacetsBuilder b0) {
        this.sparkContextOption.ifPresent(context -> b0.put("spark_version", new SparkVersionFacet((SparkContext)context)));
    }

    private void addProcessingEventFacet(OpenLineage.RunFacetsBuilder b0, OpenLineage ol) {
        this.sparkContextOption.ifPresent(context -> {
            OpenLineage.ProcessingEngineRunFacet facet = new SparkProcessingEngineRunFacetBuilderDelegate(ol, (SparkContext)context).buildFacet();
            b0.processing_engine(facet);
        });
    }

    private Optional<OpenLineage.ParentRunFacet> buildParentFacet() {
        return this.eventEmitter.getParentRunId().map(runId -> PlanUtils.parentRunFacet(runId, this.eventEmitter.getParentJobName(), this.eventEmitter.getJobNamespace()));
    }

    protected ErrorFacet buildJobErrorFacet(JobResult jobResult) {
        if (jobResult instanceof JobFailed && ((JobFailed)jobResult).exception() != null) {
            return ErrorFacet.builder().exception(((JobFailed)jobResult).exception()).build();
        }
        return null;
    }

    protected OpenLineage.Job buildJob(int jobId) {
        String suffix = this.jobSuffix;
        if (this.jobSuffix == null) {
            suffix = String.valueOf(jobId);
        }
        String name = this.eventEmitter.getAppName().orElse(this.sparkContextOption.map(SparkContext::appName).orElse("unknown"));
        String jobName = name + "." + suffix;
        return new OpenLineage.JobBuilder().namespace(this.eventEmitter.getJobNamespace()).name(jobName.replaceAll("[\\s\\-_]?((?<=.)[A-Z](?=[a-z\\s\\-_])|(?<=[^A-Z])[A-Z]|((?<=[\\s\\-_])[a-z\\d]))", "_$1").toLowerCase(Locale.ROOT)).build();
    }

    protected List<OpenLineage.OutputDataset> buildOutputs(List<URI> outputs) {
        return outputs.stream().map(this::buildOutputDataset).collect(Collectors.toList());
    }

    protected OpenLineage.InputDataset buildInputDataset(URI uri) {
        DatasetParser.DatasetParseResult result = DatasetParser.parse(uri);
        return new OpenLineage.InputDatasetBuilder().name(result.getName()).namespace(result.getNamespace()).build();
    }

    protected OpenLineage.OutputDataset buildOutputDataset(URI uri) {
        DatasetParser.DatasetParseResult result = DatasetParser.parse(uri);
        return new OpenLineage.OutputDatasetBuilder().name(result.getName()).namespace(result.getNamespace()).build();
    }

    protected List<OpenLineage.InputDataset> buildInputs(List<URI> inputs) {
        return inputs.stream().map(this::buildInputDataset).collect(Collectors.toList());
    }

    protected List<URI> findOutputs(RDD<?> rdd, Configuration config) {
        Path outputPath = RddExecutionContext.getOutputPath(rdd, config);
        log.info("Found output path {} from RDD {}", (Object)outputPath, rdd);
        if (outputPath != null) {
            return Collections.singletonList(this.getDatasetUri(outputPath.toUri()));
        }
        return Collections.emptyList();
    }

    protected List<URI> findInputs(Set<RDD<?>> rdds) {
        ArrayList<URI> result = new ArrayList<URI>();
        for (RDD<?> rdd : rdds) {
            Path[] inputPaths = this.getInputPaths(rdd);
            if (inputPaths == null) continue;
            for (Path path : inputPaths) {
                result.add(this.getDatasetUri(path.toUri()));
            }
        }
        return result;
    }

    protected Path[] getInputPaths(RDD<?> rdd) {
        Path[] inputPaths = null;
        if (rdd instanceof HadoopRDD) {
            inputPaths = org.apache.hadoop.mapred.FileInputFormat.getInputPaths((JobConf)((HadoopRDD)rdd).getJobConf());
        } else if (rdd instanceof NewHadoopRDD) {
            try {
                inputPaths = FileInputFormat.getInputPaths((JobContext)new Job(((NewHadoopRDD)rdd).getConf()));
            }
            catch (IOException e) {
                log.error("Openlineage spark agent could not get input paths", (Throwable)e);
            }
        }
        return inputPaths;
    }

    protected URI getDatasetUri(URI pathUri) {
        return pathUri;
    }

    protected void printRDDs(String prefix, RDD<?> rdd) {
        Collection deps = JavaConversions.asJavaCollection((Iterable)rdd.dependencies());
        for (Dependency dep : deps) {
            this.printRDDs(prefix + "  ", dep.rdd());
        }
    }

    protected static Path getOutputPath(RDD<?> rdd, Configuration config) {
        if (config == null) {
            return null;
        }
        JobConf jc = config instanceof JobConf ? (JobConf)config : new JobConf(config);
        Path path = org.apache.hadoop.mapred.FileOutputFormat.getOutputPath((JobConf)jc);
        if (path == null) {
            try {
                path = FileOutputFormat.getOutputPath((JobContext)new Job((Configuration)jc));
            }
            catch (IOException exception) {
                exception.printStackTrace(System.out);
            }
        }
        return path;
    }

    protected OpenLineage.RunEvent.EventType getEventType(JobResult jobResult) {
        if (jobResult.getClass().getSimpleName().startsWith("JobSucceeded")) {
            return OpenLineage.RunEvent.EventType.COMPLETE;
        }
        return OpenLineage.RunEvent.EventType.FAIL;
    }
}

