/*
 * Decompiled with CFR 0.152.
 */
package io.openlineage.spark.agent.lifecycle;

import io.openlineage.client.OpenLineage;
import io.openlineage.client.OpenLineageClientUtils;
import io.openlineage.spark.agent.hooks.HookUtils;
import io.openlineage.spark.agent.lifecycle.Rdds;
import io.openlineage.spark.agent.lifecycle.UnknownEntryFacetListener;
import io.openlineage.spark.agent.lifecycle.plan.column.ColumnLevelLineageUtils;
import io.openlineage.spark.agent.lifecycle.plan.column.ColumnLevelLineageVisitor;
import io.openlineage.spark.agent.util.FacetUtils;
import io.openlineage.spark.agent.util.PlanUtils;
import io.openlineage.spark.agent.util.ScalaConversionUtils;
import io.openlineage.spark.api.CustomFacetBuilder;
import io.openlineage.spark.api.OpenLineageContext;
import io.openlineage.spark.api.OpenLineageEventHandlerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.NonNull;
import org.apache.spark.rdd.RDD;
import org.apache.spark.scheduler.ActiveJob;
import org.apache.spark.scheduler.JobFailed;
import org.apache.spark.scheduler.SparkListenerJobEnd;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.scheduler.SparkListenerStageCompleted;
import org.apache.spark.scheduler.SparkListenerStageSubmitted;
import org.apache.spark.scheduler.Stage;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function1;
import scala.PartialFunction;

class OpenLineageRunEventBuilder {
    private static final Logger log = LoggerFactory.getLogger(OpenLineageRunEventBuilder.class);
    @NonNull
    private final OpenLineageContext openLineageContext;
    @NonNull
    private final Collection<PartialFunction<Object, List<OpenLineage.InputDataset>>> inputDatasetBuilders;
    @NonNull
    private final Collection<PartialFunction<LogicalPlan, List<OpenLineage.InputDataset>>> inputDatasetQueryPlanVisitors;
    @NonNull
    private final Collection<PartialFunction<Object, List<OpenLineage.OutputDataset>>> outputDatasetBuilders;
    @NonNull
    private final Collection<PartialFunction<LogicalPlan, List<OpenLineage.OutputDataset>>> outputDatasetQueryPlanVisitors;
    @NonNull
    private final Collection<CustomFacetBuilder<?, ? extends OpenLineage.DatasetFacet>> datasetFacetBuilders;
    @NonNull
    private final Collection<CustomFacetBuilder<?, ? extends OpenLineage.InputDatasetFacet>> inputDatasetFacetBuilders;
    @NonNull
    private final Collection<CustomFacetBuilder<?, ? extends OpenLineage.OutputDatasetFacet>> outputDatasetFacetBuilders;
    @NonNull
    private final Collection<CustomFacetBuilder<?, ? extends OpenLineage.RunFacet>> runFacetBuilders;
    @NonNull
    private final Collection<CustomFacetBuilder<?, ? extends OpenLineage.JobFacet>> jobFacetBuilders;
    @NonNull
    private final Collection<ColumnLevelLineageVisitor> columnLineageVisitors;
    private final UnknownEntryFacetListener unknownEntryFacetListener = UnknownEntryFacetListener.getInstance();
    private final Map<Integer, ActiveJob> jobMap = new HashMap<Integer, ActiveJob>();
    private final Map<Integer, Stage> stageMap = new HashMap<Integer, Stage>();

    OpenLineageRunEventBuilder(OpenLineageContext context, OpenLineageEventHandlerFactory factory) {
        this(context, factory.createInputDatasetBuilder(context), factory.createInputDatasetQueryPlanVisitors(context), factory.createOutputDatasetBuilder(context), factory.createOutputDatasetQueryPlanVisitors(context), factory.createDatasetFacetBuilders(context), factory.createInputDatasetFacetBuilders(context), factory.createOutputDatasetFacetBuilders(context), factory.createRunFacetBuilders(context), factory.createJobFacetBuilders(context), factory.createColumnLevelLineageVisitors(context));
    }

    void registerJob(ActiveJob job) {
        this.jobMap.put(job.jobId(), job);
        this.stageMap.put(job.finalStage().id(), job.finalStage());
        job.finalStage().parents().forall(ScalaConversionUtils.toScalaFn(stage -> {
            this.stageMap.put(stage.id(), (Stage)stage);
            return true;
        }));
    }

    OpenLineage.RunEvent buildRun(Optional<OpenLineage.ParentRunFacet> parentRunFacet, OpenLineage.RunEventBuilder runEventBuilder, OpenLineage.JobBuilder jobBuilder, SparkListenerStageSubmitted event) {
        Stage stage = this.stageMap.get(event.stageInfo().stageId());
        RDD rdd = stage.rdd();
        ArrayList<Object> nodes = new ArrayList<Object>();
        nodes.addAll(Arrays.asList(event.stageInfo(), stage));
        nodes.addAll(Rdds.flattenRDDs(rdd));
        return this.populateRun(parentRunFacet, runEventBuilder, jobBuilder, nodes);
    }

    OpenLineage.RunEvent buildRun(Optional<OpenLineage.ParentRunFacet> parentRunFacet, OpenLineage.RunEventBuilder runEventBuilder, OpenLineage.JobBuilder jobBuilder, SparkListenerStageCompleted event) {
        Stage stage = this.stageMap.get(event.stageInfo().stageId());
        RDD rdd = stage.rdd();
        ArrayList<Object> nodes = new ArrayList<Object>();
        nodes.addAll(Arrays.asList(event.stageInfo(), stage));
        nodes.addAll(Rdds.flattenRDDs(rdd));
        return this.populateRun(parentRunFacet, runEventBuilder, jobBuilder, nodes);
    }

    OpenLineage.RunEvent buildRun(Optional<OpenLineage.ParentRunFacet> parentRunFacet, OpenLineage.RunEventBuilder runEventBuilder, OpenLineage.JobBuilder jobBuilder, SparkListenerSQLExecutionStart event) {
        runEventBuilder.eventType(OpenLineage.RunEvent.EventType.START);
        return this.buildRun(parentRunFacet, runEventBuilder, jobBuilder, event, Optional.empty());
    }

    OpenLineage.RunEvent buildRun(Optional<OpenLineage.ParentRunFacet> parentRunFacet, OpenLineage.RunEventBuilder runEventBuilder, OpenLineage.JobBuilder jobBuilder, SparkListenerSQLExecutionEnd event) {
        runEventBuilder.eventType(OpenLineage.RunEvent.EventType.COMPLETE);
        return this.buildRun(parentRunFacet, runEventBuilder, jobBuilder, event, Optional.empty());
    }

    OpenLineage.RunEvent buildRun(Optional<OpenLineage.ParentRunFacet> parentRunFacet, OpenLineage.RunEventBuilder runEventBuilder, OpenLineage.JobBuilder jobBuilder, SparkListenerJobStart event) {
        runEventBuilder.eventType(OpenLineage.RunEvent.EventType.START);
        return this.buildRun(parentRunFacet, runEventBuilder, jobBuilder, event, Optional.ofNullable(this.jobMap.get(event.jobId())));
    }

    OpenLineage.RunEvent buildRun(Optional<OpenLineage.ParentRunFacet> parentRunFacet, OpenLineage.RunEventBuilder runEventBuilder, OpenLineage.JobBuilder jobBuilder, SparkListenerJobEnd event) {
        runEventBuilder.eventType(event.jobResult() instanceof JobFailed ? OpenLineage.RunEvent.EventType.FAIL : OpenLineage.RunEvent.EventType.COMPLETE);
        return this.buildRun(parentRunFacet, runEventBuilder, jobBuilder, event, Optional.ofNullable(this.jobMap.get(event.jobId())));
    }

    private OpenLineage.RunEvent buildRun(Optional<OpenLineage.ParentRunFacet> parentRunFacet, OpenLineage.RunEventBuilder runEventBuilder, OpenLineage.JobBuilder jobBuilder, Object event, Optional<ActiveJob> job) {
        ArrayList<Object> nodes = new ArrayList<Object>();
        nodes.add(event);
        job.ifPresent(j -> {
            nodes.add(j);
            nodes.addAll(Rdds.flattenRDDs(j.finalStage().rdd()));
        });
        return this.populateRun(parentRunFacet, runEventBuilder, jobBuilder, nodes);
    }

    private OpenLineage.RunEvent populateRun(Optional<OpenLineage.ParentRunFacet> parentRunFacet, OpenLineage.RunEventBuilder runEventBuilder, OpenLineage.JobBuilder jobBuilder, List<Object> nodes) {
        OpenLineage openLineage = this.openLineageContext.getOpenLineage();
        OpenLineage.RunFacetsBuilder runFacetsBuilder = openLineage.newRunFacetsBuilder();
        OpenLineage.JobFacetsBuilder jobFacetsBuilder = this.openLineageContext.getOpenLineage().newJobFacetsBuilder();
        parentRunFacet.ifPresent(runFacetsBuilder::parent);
        OpenLineage.JobFacets jobFacets = this.buildJobFacets(nodes, this.jobFacetBuilders, jobFacetsBuilder);
        List<OpenLineage.InputDataset> inputDatasets = this.buildInputDatasets(nodes);
        List<OpenLineage.OutputDataset> outputDatasets = this.buildOutputDatasets(nodes);
        this.openLineageContext.getQueryExecution().filter(qe -> !FacetUtils.isFacetDisabled(this.openLineageContext, "spark_unknown")).flatMap(qe -> this.unknownEntryFacetListener.build(qe.optimizedPlan())).ifPresent(facet -> runFacetsBuilder.put("spark_unknown", (OpenLineage.RunFacet)facet));
        OpenLineage.RunFacets runFacets = this.buildRunFacets(nodes, this.runFacetBuilders, runFacetsBuilder);
        OpenLineage.RunBuilder runBuilder = openLineage.newRunBuilder().runId(this.openLineageContext.getRunUuid()).facets(runFacets);
        runEventBuilder.run(runBuilder.build()).job(jobBuilder.facets(jobFacets).build()).inputs(inputDatasets).outputs(outputDatasets);
        HookUtils.preBuild(this.openLineageContext, runEventBuilder);
        return runEventBuilder.build();
    }

    private List<OpenLineage.InputDataset> buildInputDatasets(List<Object> nodes) {
        this.openLineageContext.getQueryExecution().ifPresent(qe -> {
            if (log.isDebugEnabled()) {
                log.debug("Traversing optimized plan {}", (Object)qe.optimizedPlan().toJSON());
                log.debug("Physical plan executed {}", (Object)qe.executedPlan().toJSON());
            }
        });
        log.debug("Visiting query plan {} with input dataset builders {}", this.openLineageContext.getQueryExecution(), this.inputDatasetBuilders);
        Function1<LogicalPlan, Collection<OpenLineage.InputDataset>> inputVisitor = this.visitLogicalPlan(PlanUtils.merge(this.inputDatasetQueryPlanVisitors));
        List<OpenLineage.InputDataset> datasets = Stream.concat(this.buildDatasets(nodes, this.inputDatasetBuilders), this.openLineageContext.getQueryExecution().map(qe -> ScalaConversionUtils.fromSeq(qe.optimizedPlan().map(inputVisitor)).stream().flatMap(Collection::stream).map(OpenLineage.InputDataset.class::cast)).orElse(Stream.empty())).collect(Collectors.toList());
        OpenLineage openLineage = this.openLineageContext.getOpenLineage();
        if (!datasets.isEmpty()) {
            HashMap inputFacetsMap = new HashMap();
            nodes.forEach(event -> this.inputDatasetFacetBuilders.forEach(fn -> fn.accept(event, inputFacetsMap::put)));
            HashMap datasetFacetsMap = new HashMap();
            nodes.forEach(event -> this.inputDatasetFacetBuilders.forEach(fn -> fn.accept(event, inputFacetsMap::put)));
            return datasets.stream().map(ds -> openLineage.newInputDatasetBuilder().name(ds.getName()).namespace(ds.getNamespace()).inputFacets(OpenLineageClientUtils.mergeFacets(inputFacetsMap, ds.getInputFacets(), OpenLineage.InputDatasetInputFacets.class)).facets(OpenLineageClientUtils.mergeFacets(datasetFacetsMap, ds.getFacets(), OpenLineage.DatasetFacets.class)).build()).collect(Collectors.toList());
        }
        return datasets;
    }

    private <D> Function1<LogicalPlan, Collection<D>> visitLogicalPlan(PartialFunction<LogicalPlan, Collection<D>> inputVisitor) {
        return ScalaConversionUtils.toScalaFn(node -> (List)inputVisitor.andThen(ScalaConversionUtils.toScalaFn(ds -> {
            this.unknownEntryFacetListener.accept((LogicalPlan)node);
            return ds;
        })).applyOrElse(node, ScalaConversionUtils.toScalaFn(n -> Collections.emptyList())));
    }

    private List<OpenLineage.OutputDataset> buildOutputDatasets(List<Object> nodes) {
        log.debug("Visiting query plan {} with output dataset builders {}", this.openLineageContext.getQueryExecution(), this.outputDatasetBuilders);
        Function1<LogicalPlan, Collection<OpenLineage.OutputDataset>> visitor = this.visitLogicalPlan(PlanUtils.merge(this.outputDatasetQueryPlanVisitors));
        List<OpenLineage.OutputDataset> datasets = Stream.concat(this.buildDatasets(nodes, this.outputDatasetBuilders), this.openLineageContext.getQueryExecution().map(qe -> (Collection)visitor.apply((Object)qe.optimizedPlan())).map(Collection::stream).orElse(Stream.empty())).collect(Collectors.toList());
        OpenLineage openLineage = this.openLineageContext.getOpenLineage();
        if (!datasets.isEmpty()) {
            HashMap outputFacetsMap = new HashMap();
            nodes.forEach(event -> this.outputDatasetFacetBuilders.forEach(fn -> fn.accept(event, outputFacetsMap::put)));
            HashMap datasetFacetsMap = new HashMap();
            nodes.forEach(event -> this.datasetFacetBuilders.forEach(fn -> fn.accept(event, datasetFacetsMap::put)));
            return datasets.stream().map(ds -> {
                HashMap dsFacetsMap = new HashMap(datasetFacetsMap);
                ColumnLevelLineageUtils.buildColumnLineageDatasetFacet(this.openLineageContext, ds.getFacets().getSchema()).ifPresent(facet -> {
                    OpenLineage.DatasetFacet cfr_ignored_0 = dsFacetsMap.put("columnLineage", facet);
                });
                return openLineage.newOutputDatasetBuilder().name(ds.getName()).namespace(ds.getNamespace()).outputFacets(OpenLineageClientUtils.mergeFacets(outputFacetsMap, ds.getOutputFacets(), OpenLineage.OutputDatasetOutputFacets.class)).facets(OpenLineageClientUtils.mergeFacets(dsFacetsMap, ds.getFacets(), OpenLineage.DatasetFacets.class)).build();
            }).collect(Collectors.toList());
        }
        return datasets;
    }

    private <T extends OpenLineage.Dataset> Stream<T> buildDatasets(List<Object> nodes, Collection<PartialFunction<Object, List<T>>> builders) {
        return nodes.stream().flatMap(event -> builders.stream().filter(pfn -> PlanUtils.safeIsDefinedAt(pfn, event)).map(pfn -> PlanUtils.safeApply(pfn, event)).flatMap(Collection::stream));
    }

    private OpenLineage.JobFacets buildJobFacets(List<Object> events, Collection<CustomFacetBuilder<?, ? extends OpenLineage.JobFacet>> builders, OpenLineage.JobFacetsBuilder jobFacetsBuilder) {
        events.forEach(event -> builders.forEach(fn -> fn.accept(event, jobFacetsBuilder::put)));
        return jobFacetsBuilder.build();
    }

    private OpenLineage.RunFacets buildRunFacets(List<Object> events, Collection<CustomFacetBuilder<?, ? extends OpenLineage.RunFacet>> builders, OpenLineage.RunFacetsBuilder runFacetsBuilder) {
        events.forEach(event -> builders.forEach(fn -> fn.accept(event, runFacetsBuilder::put)));
        return runFacetsBuilder.build();
    }

    public OpenLineageRunEventBuilder(@NonNull OpenLineageContext openLineageContext, @NonNull Collection<PartialFunction<Object, List<OpenLineage.InputDataset>>> inputDatasetBuilders, @NonNull Collection<PartialFunction<LogicalPlan, List<OpenLineage.InputDataset>>> inputDatasetQueryPlanVisitors, @NonNull Collection<PartialFunction<Object, List<OpenLineage.OutputDataset>>> outputDatasetBuilders, @NonNull Collection<PartialFunction<LogicalPlan, List<OpenLineage.OutputDataset>>> outputDatasetQueryPlanVisitors, @NonNull Collection<CustomFacetBuilder<?, ? extends OpenLineage.DatasetFacet>> datasetFacetBuilders, @NonNull Collection<CustomFacetBuilder<?, ? extends OpenLineage.InputDatasetFacet>> inputDatasetFacetBuilders, @NonNull Collection<CustomFacetBuilder<?, ? extends OpenLineage.OutputDatasetFacet>> outputDatasetFacetBuilders, @NonNull Collection<CustomFacetBuilder<?, ? extends OpenLineage.RunFacet>> runFacetBuilders, @NonNull Collection<CustomFacetBuilder<?, ? extends OpenLineage.JobFacet>> jobFacetBuilders, @NonNull Collection<ColumnLevelLineageVisitor> columnLineageVisitors) {
        if (openLineageContext == null) {
            throw new NullPointerException("openLineageContext is marked non-null but is null");
        }
        if (inputDatasetBuilders == null) {
            throw new NullPointerException("inputDatasetBuilders is marked non-null but is null");
        }
        if (inputDatasetQueryPlanVisitors == null) {
            throw new NullPointerException("inputDatasetQueryPlanVisitors is marked non-null but is null");
        }
        if (outputDatasetBuilders == null) {
            throw new NullPointerException("outputDatasetBuilders is marked non-null but is null");
        }
        if (outputDatasetQueryPlanVisitors == null) {
            throw new NullPointerException("outputDatasetQueryPlanVisitors is marked non-null but is null");
        }
        if (datasetFacetBuilders == null) {
            throw new NullPointerException("datasetFacetBuilders is marked non-null but is null");
        }
        if (inputDatasetFacetBuilders == null) {
            throw new NullPointerException("inputDatasetFacetBuilders is marked non-null but is null");
        }
        if (outputDatasetFacetBuilders == null) {
            throw new NullPointerException("outputDatasetFacetBuilders is marked non-null but is null");
        }
        if (runFacetBuilders == null) {
            throw new NullPointerException("runFacetBuilders is marked non-null but is null");
        }
        if (jobFacetBuilders == null) {
            throw new NullPointerException("jobFacetBuilders is marked non-null but is null");
        }
        if (columnLineageVisitors == null) {
            throw new NullPointerException("columnLineageVisitors is marked non-null but is null");
        }
        this.openLineageContext = openLineageContext;
        this.inputDatasetBuilders = inputDatasetBuilders;
        this.inputDatasetQueryPlanVisitors = inputDatasetQueryPlanVisitors;
        this.outputDatasetBuilders = outputDatasetBuilders;
        this.outputDatasetQueryPlanVisitors = outputDatasetQueryPlanVisitors;
        this.datasetFacetBuilders = datasetFacetBuilders;
        this.inputDatasetFacetBuilders = inputDatasetFacetBuilders;
        this.outputDatasetFacetBuilders = outputDatasetFacetBuilders;
        this.runFacetBuilders = runFacetBuilders;
        this.jobFacetBuilders = jobFacetBuilders;
        this.columnLineageVisitors = columnLineageVisitors;
    }
}

