/*
 * Decompiled with CFR 0.152.
 */
package io.openlineage.spark.agent;

import io.openlineage.client.Environment;
import io.openlineage.client.OpenLineage;
import io.openlineage.spark.agent.ArgumentParser;
import io.openlineage.spark.agent.EventEmitter;
import io.openlineage.spark.agent.JobMetricsHolder;
import io.openlineage.spark.agent.Versions;
import io.openlineage.spark.agent.lifecycle.ContextFactory;
import io.openlineage.spark.agent.lifecycle.ExecutionContext;
import io.openlineage.spark.agent.util.ScalaConversionUtils;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.URISyntaxException;
import java.time.ZonedDateTime;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.stream.Collectors;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkContext;
import org.apache.spark.SparkContext$;
import org.apache.spark.SparkEnv;
import org.apache.spark.SparkEnv$;
import org.apache.spark.package$;
import org.apache.spark.rdd.RDD;
import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerApplicationEnd;
import org.apache.spark.scheduler.SparkListenerApplicationStart;
import org.apache.spark.scheduler.SparkListenerEvent;
import org.apache.spark.scheduler.SparkListenerJobEnd;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.scheduler.SparkListenerTaskEnd;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function0;
import scala.Function1;
import scala.Option;

public class OpenLineageSparkListener
extends SparkListener {
    private static final Logger log = LoggerFactory.getLogger(OpenLineageSparkListener.class);
    private static final Map<Long, ExecutionContext> sparkSqlExecutionRegistry = Collections.synchronizedMap(new HashMap());
    private static final Map<Integer, ExecutionContext> rddExecutionRegistry = Collections.synchronizedMap(new HashMap());
    private static WeakHashMap<RDD<?>, Configuration> outputs = new WeakHashMap();
    private static ContextFactory contextFactory;
    private static JobMetricsHolder jobMetrics;
    private final Function1<SparkSession, SparkContext> sparkContextFromSession = ScalaConversionUtils.toScalaFn(SparkSession::sparkContext);
    private final Function0<Option<SparkContext>> activeSparkContext = ScalaConversionUtils.toScalaFn(() -> ((SparkContext$)SparkContext$.MODULE$).getActive());
    String sparkVersion = package$.MODULE$.SPARK_VERSION();
    private static final boolean isDisabled;

    public static void init(ContextFactory contextFactory) {
        OpenLineageSparkListener.contextFactory = contextFactory;
        OpenLineageSparkListener.clear();
    }

    public void onOtherEvent(SparkListenerEvent event) {
        if (isDisabled) {
            return;
        }
        this.initializeContextFactoryIfNotInitialized();
        if (event instanceof SparkListenerSQLExecutionStart) {
            OpenLineageSparkListener.sparkSQLExecStart((SparkListenerSQLExecutionStart)event);
        } else if (event instanceof SparkListenerSQLExecutionEnd) {
            OpenLineageSparkListener.sparkSQLExecEnd((SparkListenerSQLExecutionEnd)event);
        }
    }

    private static void sparkSQLExecStart(SparkListenerSQLExecutionStart startEvent) {
        OpenLineageSparkListener.getSparkSQLExecutionContext(startEvent.executionId()).ifPresent(context -> context.start(startEvent));
    }

    private static void sparkSQLExecEnd(SparkListenerSQLExecutionEnd endEvent) {
        ExecutionContext context = sparkSqlExecutionRegistry.remove(endEvent.executionId());
        if (context != null) {
            context.end(endEvent);
        } else {
            contextFactory.createSparkSQLExecutionContext(endEvent).ifPresent(c -> c.end(endEvent));
        }
    }

    public void onJobStart(SparkListenerJobStart jobStart) {
        if (isDisabled) {
            return;
        }
        this.initializeContextFactoryIfNotInitialized();
        Optional activeJob = ScalaConversionUtils.asJavaOptional(SparkSession.getDefaultSession().map(this.sparkContextFromSession).orElse(this.activeSparkContext)).flatMap(ctx -> Optional.ofNullable(ctx.dagScheduler()).map(ds -> ds.jobIdToActiveJob().get((Object)jobStart.jobId()))).flatMap(ScalaConversionUtils::asJavaOptional);
        Set<Integer> stages = ScalaConversionUtils.fromSeq(jobStart.stageIds()).stream().map(Integer.class::cast).collect(Collectors.toSet());
        if (this.sparkVersion.startsWith("3")) {
            jobMetrics.addJobStages(jobStart.jobId(), stages);
        }
        Optional.ofNullable(this.getSqlExecutionId(jobStart.properties())).map(Optional::of).orElseGet(() -> ScalaConversionUtils.asJavaOptional(SparkSession.getDefaultSession().map(this.sparkContextFromSession).orElse(this.activeSparkContext)).flatMap(ctx -> Optional.ofNullable(ctx.dagScheduler()).map(ds -> ds.jobIdToActiveJob().get((Object)jobStart.jobId())).flatMap(ScalaConversionUtils::asJavaOptional)).map(job -> this.getSqlExecutionId(job.properties()))).map(Long::parseLong).map(id -> OpenLineageSparkListener.getExecutionContext(jobStart.jobId(), id)).orElseGet(() -> OpenLineageSparkListener.getExecutionContext(jobStart.jobId())).ifPresent(context -> {
            activeJob.ifPresent(context::setActiveJob);
            context.start(jobStart);
        });
    }

    private String getSqlExecutionId(Properties properties) {
        return properties.getProperty("spark.sql.execution.id");
    }

    public void onJobEnd(SparkListenerJobEnd jobEnd) {
        if (isDisabled) {
            return;
        }
        ExecutionContext context = rddExecutionRegistry.remove(jobEnd.jobId());
        if (context != null) {
            context.end(jobEnd);
        }
        if (this.sparkVersion.startsWith("3")) {
            jobMetrics.cleanUp(jobEnd.jobId());
        }
    }

    public void onTaskEnd(SparkListenerTaskEnd taskEnd) {
        if (isDisabled || this.sparkVersion.startsWith("2")) {
            return;
        }
        jobMetrics.addMetrics(taskEnd.stageId(), taskEnd.taskMetrics());
    }

    public static Optional<ExecutionContext> getSparkSQLExecutionContext(long executionId) {
        return Optional.ofNullable(sparkSqlExecutionRegistry.computeIfAbsent(executionId, e -> contextFactory.createSparkSQLExecutionContext(executionId).orElse(null)));
    }

    public static Optional<ExecutionContext> getExecutionContext(int jobId) {
        return Optional.ofNullable(rddExecutionRegistry.computeIfAbsent(jobId, e -> contextFactory.createRddExecutionContext(jobId)));
    }

    public static Optional<ExecutionContext> getExecutionContext(int jobId, long executionId) {
        Optional<ExecutionContext> executionContext = OpenLineageSparkListener.getSparkSQLExecutionContext(executionId);
        executionContext.ifPresent(context -> rddExecutionRegistry.put(jobId, (ExecutionContext)context));
        return executionContext;
    }

    public static Configuration getConfigForRDD(RDD<?> rdd) {
        return outputs.get(rdd);
    }

    public static void emitError(Exception e) {
        OpenLineage ol = new OpenLineage(Versions.OPEN_LINEAGE_PRODUCER_URI);
        try {
            OpenLineageSparkListener.contextFactory.openLineageEventEmitter.emit(OpenLineageSparkListener.buildErrorLineageEvent(ol, OpenLineageSparkListener.errorRunFacet(e, ol)));
        }
        catch (Exception ex) {
            log.error("Could not emit open lineage on error", (Throwable)e);
        }
    }

    private static OpenLineage.RunFacets errorRunFacet(Exception e, OpenLineage ol) {
        OpenLineage.RunFacet errorFacet = ol.newRunFacet();
        ByteArrayOutputStream buffer = new ByteArrayOutputStream();
        e.printStackTrace(new PrintWriter((OutputStream)buffer, true));
        errorFacet.getAdditionalProperties().put("exception", buffer.toString());
        OpenLineage.RunFacetsBuilder runFacetsBuilder = ol.newRunFacetsBuilder();
        runFacetsBuilder.put("lineage.error", errorFacet);
        return runFacetsBuilder.build();
    }

    public static OpenLineage.RunEvent buildErrorLineageEvent(OpenLineage ol, OpenLineage.RunFacets runFacets) {
        return ol.newRunEventBuilder().eventTime(ZonedDateTime.now()).run(ol.newRun(OpenLineageSparkListener.contextFactory.openLineageEventEmitter.getParentRunId().orElse(null), runFacets)).job(ol.newJobBuilder().namespace(OpenLineageSparkListener.contextFactory.openLineageEventEmitter.getJobNamespace()).name(OpenLineageSparkListener.contextFactory.openLineageEventEmitter.getParentJobName()).build()).build();
    }

    private static void clear() {
        sparkSqlExecutionRegistry.clear();
        rddExecutionRegistry.clear();
        outputs.clear();
    }

    public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
        OpenLineageSparkListener.close();
        super.onApplicationEnd(applicationEnd);
    }

    public static void close() {
        OpenLineageSparkListener.clear();
    }

    public void onApplicationStart(SparkListenerApplicationStart applicationStart) {
        this.initializeContextFactoryIfNotInitialized();
    }

    private void initializeContextFactoryIfNotInitialized() {
        if (contextFactory != null || isDisabled) {
            return;
        }
        SparkEnv sparkEnv = SparkEnv$.MODULE$.get();
        if (sparkEnv != null) {
            try {
                ArgumentParser args = ArgumentParser.parse(sparkEnv.conf());
                contextFactory = new ContextFactory(new EventEmitter(args));
            }
            catch (URISyntaxException e) {
                log.error("Unable to parse open lineage endpoint. Lineage events will not be collected", (Throwable)e);
            }
        } else {
            log.warn("Open lineage listener instantiated, but no configuration could be found. Lineage events will not be collected");
        }
    }

    private static boolean checkIfDisabled() {
        String isDisabled = Environment.getEnvironmentVariable("OPENLINEAGE_DISABLED");
        return Boolean.parseBoolean(isDisabled);
    }

    static {
        jobMetrics = JobMetricsHolder.getInstance();
        isDisabled = OpenLineageSparkListener.checkIfDisabled();
    }
}

