/*
 * Decompiled with CFR 0.152.
 */
package io.openlineage.spark.agent.lifecycle.plan;

import io.openlineage.client.OpenLineage;
import io.openlineage.spark.agent.util.ScalaConversionUtils;
import io.openlineage.spark.api.DatasetFactory;
import io.openlineage.spark.api.OpenLineageContext;
import io.openlineage.spark.api.QueryPlanVisitor;
import io.openlineage.spark.shaded.com.fasterxml.jackson.databind.JsonNode;
import io.openlineage.spark.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URI;
import java.util.List;
import java.util.Optional;
import java.util.Spliterators;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.execution.datasources.LogicalRelation;
import org.apache.spark.sql.kafka010.KafkaRelation;
import org.apache.spark.sql.kafka010.KafkaSourceProvider;
import org.apache.spark.sql.sources.CreatableRelationProvider;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.immutable.Map;
import scala.collection.immutable.Map$;

public class KafkaRelationVisitor<D extends OpenLineage.Dataset>
extends QueryPlanVisitor<LogicalRelation, D> {
    private static final Logger log = LoggerFactory.getLogger(KafkaRelationVisitor.class);
    private final DatasetFactory<D> datasetFactory;

    public KafkaRelationVisitor(OpenLineageContext context, DatasetFactory<D> datasetFactory) {
        super(context);
        this.datasetFactory = datasetFactory;
    }

    public static boolean hasKafkaClasses() {
        try {
            KafkaRelationVisitor.class.getClassLoader().loadClass("org.apache.spark.sql.kafka010.KafkaSourceProvider");
            return true;
        }
        catch (Exception exception) {
            return false;
        }
    }

    public static boolean isKafkaSource(CreatableRelationProvider provider) {
        if (!KafkaRelationVisitor.hasKafkaClasses()) {
            return false;
        }
        return provider instanceof KafkaSourceProvider;
    }

    public static <D extends OpenLineage.Dataset> List<D> createKafkaDatasets(DatasetFactory<D> datasetFactory, CreatableRelationProvider relationProvider, Map<String, String> options, SaveMode mode, StructType schema) {
        return KafkaRelationVisitor.createDatasetsFromOptions(datasetFactory, options, schema);
    }

    @Override
    public boolean isDefinedAt(LogicalPlan x) {
        return x instanceof LogicalRelation && ((LogicalRelation)x).relation() instanceof KafkaRelation;
    }

    public List<D> apply(LogicalPlan x) {
        Map sourceOptions;
        KafkaRelation relation = (KafkaRelation)((LogicalRelation)x).relation();
        try {
            Field sourceOptionsField = relation.getClass().getDeclaredField("sourceOptions");
            sourceOptionsField.setAccessible(true);
            sourceOptions = (Map)sourceOptionsField.get(relation);
        }
        catch (Exception e) {
            log.error("Can't extract kafka server options", (Throwable)e);
            sourceOptions = Map$.MODULE$.empty();
        }
        return KafkaRelationVisitor.createDatasetsFromOptions(this.datasetFactory, (Map<String, String>)sourceOptions, relation.schema());
    }

    private static <D extends OpenLineage.Dataset> List<D> createDatasetsFromOptions(DatasetFactory<D> datasetFactory, Map<String, String> sourceOptions, StructType schema) {
        Optional<String> servers = ScalaConversionUtils.asJavaOptional(sourceOptions.get((Object)"kafka.bootstrap.servers"));
        List topics = Stream.concat(Stream.of("subscribe", "topic").map(it -> sourceOptions.get(it)).filter(it -> it.nonEmpty()).map(it -> (String)it.get()).map(String.class::cast), ScalaConversionUtils.asJavaOptional(sourceOptions.get((Object)"assign")).map(str -> {
            try {
                JsonNode jsonNode = new ObjectMapper().readTree((String)str);
                long fieldCount = jsonNode.size();
                return StreamSupport.stream(Spliterators.spliterator(jsonNode.fieldNames(), fieldCount, 0), false);
            }
            catch (IOException e) {
                log.warn("Unable to find topics from Kafka source configuration {}", str, (Object)e);
                return Stream.empty();
            }
        }).orElse(Stream.empty())).collect(Collectors.toList());
        String server = servers.map(str -> {
            if (!str.matches("\\w+://.*")) {
                return "PLAINTEXT://" + str;
            }
            return str;
        }).map(str -> URI.create(str.split(",")[0])).map(uri -> uri.getHost() + ":" + uri.getPort()).orElse("");
        String namespace = "kafka://" + server;
        return topics.stream().map(topic -> datasetFactory.getDataset((String)topic, namespace, schema)).collect(Collectors.toList());
    }
}

