/*
 * Decompiled with CFR 0.152.
 */
package io.openlineage.spark.agent.lifecycle;

import io.openlineage.client.OpenLineage;
import io.openlineage.client.OpenLineageClientUtils;
import io.openlineage.spark.agent.EventEmitter;
import io.openlineage.spark.agent.Versions;
import io.openlineage.spark.agent.filters.EventFilterUtils;
import io.openlineage.spark.agent.lifecycle.ExecutionContext;
import io.openlineage.spark.agent.lifecycle.OpenLineageRunEventBuilder;
import io.openlineage.spark.agent.util.PlanUtils;
import io.openlineage.spark.api.OpenLineageContext;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.Locale;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.spark.SparkContext;
import org.apache.spark.scheduler.ActiveJob;
import org.apache.spark.scheduler.SparkListenerEvent;
import org.apache.spark.scheduler.SparkListenerJobEnd;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.scheduler.SparkListenerStageCompleted;
import org.apache.spark.scheduler.SparkListenerStageSubmitted;
import org.apache.spark.sql.execution.QueryExecution;
import org.apache.spark.sql.execution.SparkPlan;
import org.apache.spark.sql.execution.WholeStageCodegenExec;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SparkSQLExecutionContext
implements ExecutionContext {
    private static final Logger log = LoggerFactory.getLogger(SparkSQLExecutionContext.class);
    private static final String NO_EXECUTION_INFO = "No execution info {}";
    private final long executionId;
    private final OpenLineageContext olContext;
    private final EventEmitter eventEmitter;
    private final OpenLineageRunEventBuilder runEventBuilder;
    private final OpenLineage openLineage = new OpenLineage(Versions.OPEN_LINEAGE_PRODUCER_URI);
    private AtomicBoolean finished = new AtomicBoolean(false);

    public SparkSQLExecutionContext(long executionId, EventEmitter eventEmitter, OpenLineageContext olContext, OpenLineageRunEventBuilder runEventBuilder) {
        this.executionId = executionId;
        this.eventEmitter = eventEmitter;
        this.olContext = olContext;
        this.runEventBuilder = runEventBuilder;
    }

    @Override
    public void start(SparkListenerSQLExecutionStart startEvent) {
        log.debug("SparkListenerSQLExecutionStart - executionId: {}", (Object)startEvent.executionId());
        if (!this.olContext.getQueryExecution().isPresent()) {
            log.info(NO_EXECUTION_INFO, (Object)this.olContext);
            return;
        }
        if (EventFilterUtils.isDisabled(this.olContext, (SparkListenerEvent)startEvent)) {
            log.info("OpenLineage received Spark event that is configured to be skipped: SparkListenerSQLExecutionStart");
            return;
        }
        OpenLineage.RunEvent event = this.runEventBuilder.buildRun(this.buildParentFacet(), this.openLineage.newRunEventBuilder().eventTime(this.toZonedTime(startEvent.time())), this.buildJob(this.olContext.getQueryExecution().get()), startEvent);
        log.debug("Posting event for start {}: {}", (Object)this.executionId, (Object)event);
        this.eventEmitter.emit(event);
    }

    @Override
    public void end(SparkListenerSQLExecutionEnd endEvent) {
        log.debug("SparkListenerSQLExecutionEnd - executionId: {}", (Object)endEvent.executionId());
        if (!this.olContext.getQueryExecution().isPresent()) {
            log.info(NO_EXECUTION_INFO, (Object)this.olContext);
            return;
        }
        if (EventFilterUtils.isDisabled(this.olContext, (SparkListenerEvent)endEvent)) {
            log.info("OpenLineage received Spark event that is configured to be skipped: SparkListenerSQLExecutionEnd");
            return;
        }
        OpenLineage.RunEvent event = this.runEventBuilder.buildRun(this.buildParentFacet(), this.openLineage.newRunEventBuilder().eventTime(this.toZonedTime(endEvent.time())), this.buildJob(this.olContext.getQueryExecution().get()), endEvent);
        log.debug("Posting event for end {}: {}", (Object)this.executionId, (Object)OpenLineageClientUtils.toJson(event));
        this.eventEmitter.emit(event);
    }

    @Override
    public void start(SparkListenerStageSubmitted stageSubmitted) {
        if (!this.olContext.getQueryExecution().isPresent()) {
            log.info(NO_EXECUTION_INFO, (Object)this.olContext);
            return;
        }
        if (EventFilterUtils.isDisabled(this.olContext, (SparkListenerEvent)stageSubmitted)) {
            log.info("OpenLineage received Spark event that is configured to be skipped: SparkListenerStageSubmitted");
            return;
        }
        OpenLineage.RunEvent event = this.runEventBuilder.buildRun(this.buildParentFacet(), this.openLineage.newRunEventBuilder().eventTime(ZonedDateTime.now(ZoneOffset.UTC)), this.buildJob(this.olContext.getQueryExecution().get()), stageSubmitted);
        log.debug("Posting event for stage submitted {}: {}", (Object)this.executionId, (Object)event);
        this.eventEmitter.emit(event);
    }

    @Override
    public void end(SparkListenerStageCompleted stageCompleted) {
        if (!this.olContext.getQueryExecution().isPresent()) {
            log.info(NO_EXECUTION_INFO, (Object)this.olContext);
            return;
        }
        if (EventFilterUtils.isDisabled(this.olContext, (SparkListenerEvent)stageCompleted)) {
            log.info("OpenLineage received Spark event that is configured to be skipped: SparkListenerStageCompleted");
            return;
        }
        OpenLineage.RunEvent event = this.runEventBuilder.buildRun(this.buildParentFacet(), this.openLineage.newRunEventBuilder().eventTime(ZonedDateTime.now(ZoneOffset.UTC)), this.buildJob(this.olContext.getQueryExecution().get()), stageCompleted);
        log.debug("Posting event for stage completed {}: {}", (Object)this.executionId, (Object)event);
        this.eventEmitter.emit(event);
    }

    @Override
    public void setActiveJob(ActiveJob activeJob) {
        this.runEventBuilder.registerJob(activeJob);
    }

    @Override
    public void start(SparkListenerJobStart jobStart) {
        log.debug("SparkListenerJobStart - executionId: " + this.executionId);
        if (!this.olContext.getQueryExecution().isPresent()) {
            log.info(NO_EXECUTION_INFO, (Object)this.olContext);
            return;
        }
        if (EventFilterUtils.isDisabled(this.olContext, (SparkListenerEvent)jobStart)) {
            log.info("OpenLineage received Spark event that is configured to be skipped: SparkListenerJobStart");
            return;
        }
        OpenLineage.RunEvent event = this.runEventBuilder.buildRun(this.buildParentFacet(), this.openLineage.newRunEventBuilder().eventTime(this.toZonedTime(jobStart.time())), this.buildJob(this.olContext.getQueryExecution().get()), jobStart);
        log.debug("Posting event for start {}: {}", (Object)this.executionId, (Object)event);
        this.eventEmitter.emit(event);
    }

    @Override
    public void end(SparkListenerJobEnd jobEnd) {
        log.debug("SparkListenerJobEnd - executionId: " + this.executionId);
        if (!this.finished.compareAndSet(false, true)) {
            log.debug("Event already finished, returning");
            return;
        }
        if (!this.olContext.getQueryExecution().isPresent()) {
            log.info(NO_EXECUTION_INFO, (Object)this.olContext);
            return;
        }
        if (EventFilterUtils.isDisabled(this.olContext, (SparkListenerEvent)jobEnd)) {
            log.info("OpenLineage received Spark event that is configured to be skipped: SparkListenerJobEnd");
            return;
        }
        OpenLineage.RunEvent event = this.runEventBuilder.buildRun(this.buildParentFacet(), this.openLineage.newRunEventBuilder().eventTime(this.toZonedTime(jobEnd.time())), this.buildJob(this.olContext.getQueryExecution().get()), jobEnd);
        log.debug("Posting event for end {}: {}", (Object)this.executionId, (Object)event);
        this.eventEmitter.emit(event);
    }

    private Optional<OpenLineage.ParentRunFacet> buildParentFacet() {
        return this.eventEmitter.getParentRunId().map(runId -> PlanUtils.parentRunFacet(runId, this.eventEmitter.getParentJobName(), this.eventEmitter.getJobNamespace()));
    }

    protected ZonedDateTime toZonedTime(long time) {
        Instant i = Instant.ofEpochMilli(time);
        return ZonedDateTime.ofInstant(i, ZoneOffset.UTC);
    }

    protected OpenLineage.JobBuilder buildJob(QueryExecution queryExecution) {
        SparkContext sparkContext = queryExecution.executedPlan().sparkContext();
        SparkPlan node = queryExecution.executedPlan();
        if (node instanceof WholeStageCodegenExec) {
            node = ((WholeStageCodegenExec)node).child();
        }
        String name = this.eventEmitter.getAppName().orElse(sparkContext.appName());
        return this.openLineage.newJobBuilder().namespace(this.eventEmitter.getJobNamespace()).name(SparkSQLExecutionContext.normalizeName(name) + "." + SparkSQLExecutionContext.normalizeName(node.nodeName()));
    }

    private static String normalizeName(String name) {
        return name.replaceAll("[\\s\\-_]?((?<=.)[A-Z](?=[a-z\\s\\-_])|(?<=[^A-Z])[A-Z]|((?<=[\\s\\-_])[a-z\\d]))", "_$1").toLowerCase(Locale.ROOT);
    }
}

