package org.apache.kyuubi.engine.spark.operation;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.module.scala.DefaultScalaModule$;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.InputStream;
import java.lang.ProcessBuilder;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.ws.rs.core.UriBuilder;
import org.apache.commons.lang3.StringUtils;
import org.apache.kyuubi.Logging;
import org.apache.kyuubi.Utils$;
import org.apache.kyuubi.config.KyuubiConf$;
import org.apache.kyuubi.session.Session;
import org.apache.kyuubi.shade.io.vertx.core.cli.UsageMessageFormatter;
import org.apache.kyuubi.shade.org.apache.thrift.transport.TFastFramedTransport;
import org.apache.spark.SparkFiles$;
import org.apache.spark.api.python.KyuubiPythonGatewayServer$;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import scala.Array$;
import scala.Function0;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayOps;
import scala.collection.mutable.Iterable$;
import scala.io.Codec$;
import scala.io.Source$;
import scala.reflect.ClassTag$;
import scala.reflect.Manifest;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.sys.package$;
import scala.util.matching.Regex;

/* compiled from: ExecutePython.scala */
/* loaded from: input_file:org/apache/kyuubi/engine/spark/operation/ExecutePython$.class */
public final class ExecutePython$ implements Logging {
    public static ExecutePython$ MODULE$;
    private final Regex PY4J_REGEX;
    private final AtomicBoolean isPythonGatewayStart;
    private final Path kyuubiPythonPath;
    private final ObjectMapper mapper;
    private transient Logger org$apache$kyuubi$Logging$$log_;

    static {
        new ExecutePython$();
    }

    @Override // org.apache.kyuubi.Logging
    public String loggerName() {
        return Logging.loggerName$(this);
    }

    @Override // org.apache.kyuubi.Logging
    public Logger logger() {
        return Logging.logger$(this);
    }

    @Override // org.apache.kyuubi.Logging
    public void debug(Function0<Object> function0) {
        Logging.debug$(this, function0);
    }

    @Override // org.apache.kyuubi.Logging
    public void debug(Function0<Object> function0, Throwable th) {
        debug(function0, th);
    }

    @Override // org.apache.kyuubi.Logging
    public void info(Function0<Object> function0) {
        info(function0);
    }

    @Override // org.apache.kyuubi.Logging
    public void info(Function0<Object> function0, Throwable th) {
        info(function0, th);
    }

    @Override // org.apache.kyuubi.Logging
    public void warn(Function0<Object> function0) {
        warn(function0);
    }

    @Override // org.apache.kyuubi.Logging
    public void warn(Function0<Object> function0, Throwable th) {
        warn(function0, th);
    }

    @Override // org.apache.kyuubi.Logging
    public void error(Function0<Object> function0, Throwable th) {
        error(function0, th);
    }

    @Override // org.apache.kyuubi.Logging
    public void error(Function0<Object> function0) {
        error(function0);
    }

    @Override // org.apache.kyuubi.Logging
    public void initializeLoggerIfNecessary(boolean z) {
        initializeLoggerIfNecessary(z);
    }

    @Override // org.apache.kyuubi.Logging
    public Logger org$apache$kyuubi$Logging$$log_() {
        return this.org$apache$kyuubi$Logging$$log_;
    }

    @Override // org.apache.kyuubi.Logging
    public void org$apache$kyuubi$Logging$$log__$eq(Logger logger) {
        this.org$apache$kyuubi$Logging$$log_ = logger;
    }

    public final String DEFAULT_SPARK_PYTHON_HOME_ARCHIVE_FRAGMENT() {
        return "__kyuubi_spark_python_home__";
    }

    public final String DEFAULT_SPARK_PYTHON_ENV_ARCHIVE_FRAGMENT() {
        return "__kyuubi_spark_python_env__";
    }

    public final Regex PY4J_REGEX() {
        return this.PY4J_REGEX;
    }

    public final String PY4J_PATH() {
        return "PY4J_PATH";
    }

    public final String IS_PYTHON_APP_KEY() {
        return "spark.yarn.isPython";
    }

    private AtomicBoolean isPythonGatewayStart() {
        return this.isPythonGatewayStart;
    }

    private Path kyuubiPythonPath() {
        return this.kyuubiPythonPath;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v15, types: [java.util.concurrent.atomic.AtomicBoolean] */
    /* JADX WARN: Type inference failed for: r0v3 */
    /* JADX WARN: Type inference failed for: r0v4, types: [java.lang.Throwable] */
    public void init() {
        if (isPythonGatewayStart().get()) {
            return;
        }
        ?? r0 = this;
        synchronized (r0) {
            if (!isPythonGatewayStart().get()) {
                KyuubiPythonGatewayServer$.MODULE$.start();
                writeTempPyFile(kyuubiPythonPath(), "execute_python.py");
                writeTempPyFile(kyuubiPythonPath(), "kyuubi_util.py");
                r0 = isPythonGatewayStart();
                r0.set(true);
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public SessionPythonWorker createSessionPythonWorker(SparkSession sparkSession, Session session) {
        String uuid = session.handle().identifier().toString();
        ProcessBuilder processBuilder = new ProcessBuilder((List<String>) JavaConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon((String) StringUtils.firstNonBlank(new String[]{(String) sparkSession.conf().getOption("spark.pyspark.driver.python").orNull(Predef$.MODULE$.$conforms()), (String) sparkSession.conf().getOption("spark.pyspark.python").orNull(Predef$.MODULE$.$conforms()), System.getenv("PYSPARK_DRIVER_PYTHON"), System.getenv("PYSPARK_PYTHON"), (String) getSparkPythonExecFromArchive(sparkSession, session).getOrElse(() -> {
            return "python3";
        })}), new $colon.colon(new StringBuilder(18).append(kyuubiPythonPath()).append("/execute_python.py").toString(), Nil$.MODULE$))).asJava());
        Map<String, String> environment = processBuilder.environment();
        Object[] objArr = (Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(((String) package$.MODULE$.env().getOrElse("PYTHONPATH", () -> {
            return "";
        })).split(File.pathSeparator))).$plus$plus(new StringOps(Predef$.MODULE$.augmentString(kyuubiPythonPath().toString())), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.Any()));
        environment.put("PYTHONPATH", Predef$.MODULE$.genericArrayOps(objArr).mkString(File.pathSeparator));
        new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(Predef$.MODULE$.genericArrayOps(objArr).mkString(File.pathSeparator).split(File.pathSeparator))).find(str -> {
            return BoxesRunTime.boxToBoolean($anonfun$createSessionPythonWorker$3(str));
        }).foreach(str2 -> {
            return (String) environment.put("PY4J_PATH", str2);
        });
        if (sparkSession.sparkContext().getConf().getBoolean("spark.yarn.isPython", false)) {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            environment.put("SPARK_HOME", package$.MODULE$.env().getOrElse("SPARK_HOME", () -> {
                return (String) MODULE$.getSparkPythonHomeFromArchive(sparkSession, session).getOrElse(() -> {
                    return MODULE$.defaultSparkHome();
                });
            }));
        }
        environment.put("KYUUBI_SPARK_SESSION_UUID", uuid);
        environment.put("PYTHON_GATEWAY_CONNECTION_INFO", KyuubiPythonGatewayServer$.MODULE$.CONNECTION_FILE_PATH());
        logger().info(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(86).append("\n         |launch python worker command: ").append(((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(processBuilder.command()).asScala()).mkString(UsageMessageFormatter.DEFAULT_LONG_OPT_SEPARATOR)).append("\n         |environment:\n         |").append(((TraversableOnce) ((TraversableLike) JavaConverters$.MODULE$.mapAsScalaMapConverter(processBuilder.environment()).asScala()).map(tuple2 -> {
            return new StringBuilder(1).append((String) tuple2._1()).append("=").append(tuple2._2()).toString();
        }, Iterable$.MODULE$.canBuildFrom())).mkString("\n")).append("\n         |").toString())).stripMargin());
        processBuilder.redirectError(ProcessBuilder.Redirect.PIPE);
        Process start = processBuilder.start();
        return new SessionPythonWorker(startStderrSteamReader(start, uuid), startWatcher(start, uuid), start);
    }

    public Option<String> getSparkPythonExecFromArchive(SparkSession sparkSession, Session session) {
        Option orElse = sparkSession.conf().getOption(KyuubiConf$.MODULE$.ENGINE_SPARK_PYTHON_ENV_ARCHIVE().key()).orElse(() -> {
            return (Option) session.sessionManager().getConf().get(KyuubiConf$.MODULE$.ENGINE_SPARK_PYTHON_ENV_ARCHIVE());
        });
        String str = (String) sparkSession.conf().getOption(KyuubiConf$.MODULE$.ENGINE_SPARK_PYTHON_ENV_ARCHIVE_EXEC_PATH().key()).getOrElse(() -> {
            return (String) session.sessionManager().getConf().get(KyuubiConf$.MODULE$.ENGINE_SPARK_PYTHON_ENV_ARCHIVE_EXEC_PATH());
        });
        return Option$.MODULE$.option2Iterable(orElse.map(str2 -> {
            URI uri = new URI(str2);
            if (uri.getFragment() == null) {
                uri = UriBuilder.fromUri(uri).fragment("__kyuubi_spark_python_env__").build(new Object[0]);
            }
            sparkSession.sparkContext().addArchive(uri.toString());
            return Paths.get(SparkFiles$.MODULE$.get(uri.getFragment()), str);
        })).find(path -> {
            return BoxesRunTime.boxToBoolean($anonfun$getSparkPythonExecFromArchive$4(path));
        }).map(path2 -> {
            return path2.toAbsolutePath().toFile().getCanonicalPath();
        });
    }

    public Option<String> getSparkPythonHomeFromArchive(SparkSession sparkSession, Session session) {
        return Option$.MODULE$.option2Iterable(sparkSession.conf().getOption(KyuubiConf$.MODULE$.ENGINE_SPARK_PYTHON_HOME_ARCHIVE().key()).orElse(() -> {
            return (Option) session.sessionManager().getConf().get(KyuubiConf$.MODULE$.ENGINE_SPARK_PYTHON_HOME_ARCHIVE());
        }).map(str -> {
            URI uri = new URI(str);
            if (uri.getFragment() == null) {
                uri = UriBuilder.fromUri(uri).fragment("__kyuubi_spark_python_home__").build(new Object[0]);
            }
            sparkSession.sparkContext().addArchive(uri.toString());
            return Paths.get(SparkFiles$.MODULE$.get(uri.getFragment()), new String[0]);
        })).find(path -> {
            return BoxesRunTime.boxToBoolean($anonfun$getSparkPythonHomeFromArchive$3(path));
        }).map(path2 -> {
            return path2.toAbsolutePath().toFile().getCanonicalPath();
        });
    }

    public String defaultSparkHome() {
        FilenameFilter filenameFilter = (file, str) -> {
            return file.isDirectory() && str.contains("spark-") && !str.contains("-engine");
        };
        return (String) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(new File(getClass().getProtectionDomain().getCodeSource().getLocation().toURI()).getPath().split("kyuubi-spark-sql-engine"))).flatMap(str2 -> {
            File[] listFiles = Paths.get(str2, "kyuubi-download", "target").toFile().listFiles(filenameFilter);
            return listFiles == null ? Option$.MODULE$.option2Iterable(None$.MODULE$) : Option$.MODULE$.option2Iterable(new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(listFiles)).map(file2 -> {
                return file2.toPath();
            }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Path.class))))).headOption());
        }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Path.class))))).find(path -> {
            return BoxesRunTime.boxToBoolean($anonfun$defaultSparkHome$4(path));
        }).map(path2 -> {
            return path2.toAbsolutePath().toFile().getCanonicalPath();
        }).getOrElse(() -> {
            throw new IllegalStateException("SPARK_HOME not found!");
        });
    }

    private Thread startStderrSteamReader(final Process process, final String str) {
        Thread thread = new Thread(str, process) { // from class: org.apache.kyuubi.engine.spark.operation.ExecutePython$$anon$2
            private final Process process$1;

            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                Iterator filter = Source$.MODULE$.fromInputStream(this.process$1.getErrorStream(), Codec$.MODULE$.fallbackSystemCodec()).getLines().filter(str2 -> {
                    return BoxesRunTime.boxToBoolean($anonfun$run$1(str2));
                });
                Logger logger = ExecutePython$.MODULE$.logger();
                filter.foreach(str3 -> {
                    logger.error(str3);
                    return BoxedUnit.UNIT;
                });
            }

            public static final /* synthetic */ boolean $anonfun$run$1(String str2) {
                return new StringOps(Predef$.MODULE$.augmentString(str2.trim())).nonEmpty();
            }

            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(new StringBuilder(31).append("session[").append(str).append("] process stderr thread").toString());
                this.process$1 = process;
            }
        };
        thread.setDaemon(true);
        thread.start();
        return thread;
    }

    public Thread startWatcher(final Process process, final String str) {
        Thread thread = new Thread(str, process) { // from class: org.apache.kyuubi.engine.spark.operation.ExecutePython$$anon$3
            private final Process process$2;

            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    int waitFor = this.process$2.waitFor();
                    if (waitFor != 0) {
                        ExecutePython$.MODULE$.logger().error(new StringOps("Process has died with %s").format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(waitFor)})));
                    }
                } catch (InterruptedException unused) {
                    ExecutePython$.MODULE$.logger().warn("Process has been interrupted");
                }
            }

            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(new StringBuilder(32).append("session[").append(str).append("] process watcher thread").toString());
                this.process$2 = process;
            }
        };
        thread.setDaemon(true);
        thread.start();
        return thread;
    }

    private File writeTempPyFile(Path path, String str) {
        InputStream resourceAsStream = getClass().getClassLoader().getResourceAsStream(new StringBuilder(7).append("python/").append(str).toString());
        File file = new File(path.toFile(), str);
        file.deleteOnExit();
        FileOutputStream fileOutputStream = new FileOutputStream(file);
        byte[] bArr = new byte[TFastFramedTransport.DEFAULT_BUF_CAPACITY];
        int read = resourceAsStream.read(bArr);
        while (true) {
            int i = read;
            if (i <= 0) {
                resourceAsStream.close();
                fileOutputStream.close();
                return file;
            }
            fileOutputStream.write(bArr, 0, i);
            read = resourceAsStream.read(bArr);
        }
    }

    public ObjectMapper mapper() {
        return this.mapper;
    }

    public <T> String toJson(T t) {
        return mapper().writeValueAsString(t);
    }

    public <T> T fromJson(String str, Class<T> cls) {
        return (T) mapper().readValue(str, cls);
    }

    public <T> T fromJson(String str, Manifest<T> manifest) {
        return (T) mapper().readValue(str, manifest.runtimeClass());
    }

    public static final /* synthetic */ boolean $anonfun$createSessionPythonWorker$3(String str) {
        return MODULE$.PY4J_REGEX().findFirstMatchIn(str).nonEmpty();
    }

    public static final /* synthetic */ boolean $anonfun$getSparkPythonExecFromArchive$4(Path path) {
        return Files.exists(path, new LinkOption[0]);
    }

    public static final /* synthetic */ boolean $anonfun$getSparkPythonHomeFromArchive$3(Path path) {
        return Files.exists(path, new LinkOption[0]);
    }

    public static final /* synthetic */ boolean $anonfun$defaultSparkHome$4(Path path) {
        return Files.exists(path, new LinkOption[0]);
    }

    private ExecutePython$() {
        MODULE$ = this;
        Logging.$init$(this);
        this.PY4J_REGEX = new StringOps(Predef$.MODULE$.augmentString("py4j-[\\S]*.zip$")).r();
        this.isPythonGatewayStart = new AtomicBoolean(false);
        this.kyuubiPythonPath = Utils$.MODULE$.createTempDir(Utils$.MODULE$.createTempDir$default$1(), Utils$.MODULE$.createTempDir$default$2());
        this.mapper = new ObjectMapper().registerModule(DefaultScalaModule$.MODULE$);
    }
}
