package org.apache.flink.client.python;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.python.shaded.py4j.GatewayServer;
import org.apache.flink.api.python.shaded.py4j.Py4JJavaServer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.functions.python.PythonFunction;

/* loaded from: input_file:org/apache/flink/client/python/PythonFunctionFactory.class */
public interface PythonFunctionFactory {
    public static final long CHECK_INTERVAL = 100;
    public static final long TIMEOUT_MILLIS = 10000;
    public static final AtomicReference<PythonFunctionFactory> PYTHON_FUNCTION_FACTORY_REF = new AtomicReference<>();

    /* loaded from: input_file:org/apache/flink/client/python/PythonFunctionFactory$PythonProcessShutdownHook.class */
    public static class PythonProcessShutdownHook extends Thread {
        private Process process;

        public PythonProcessShutdownHook(Process process) {
            this.process = process;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            PythonFunctionFactory.shutdownPythonProcess(this.process, PythonFunctionFactory.TIMEOUT_MILLIS);
        }
    }

    PythonFunction getPythonFunction(String str, String str2);

    static PythonFunction getPythonFunction(String str, ReadableConfig readableConfig) throws IOException, ExecutionException, InterruptedException {
        int lastIndexOf = str.lastIndexOf(".");
        if (lastIndexOf <= 0) {
            throw new IllegalArgumentException(String.format("The fully qualified name is invalid: '%s'", str));
        }
        String substring = str.substring(0, lastIndexOf);
        String substring2 = str.substring(lastIndexOf + 1);
        Configuration configuration = new Configuration(ExecutionEnvironment.getExecutionEnvironment().getConfiguration());
        configuration.addAll((Configuration) readableConfig);
        return getPythonFunctionFactory(configuration).getPythonFunction(substring, substring2);
    }

    static PythonFunctionFactory getPythonFunctionFactory(ReadableConfig readableConfig) throws ExecutionException, InterruptedException, IOException {
        Map map;
        synchronized (PythonFunctionFactory.class) {
            if (PYTHON_FUNCTION_FACTORY_REF.get() != null) {
                return PYTHON_FUNCTION_FACTORY_REF.get();
            }
            if (PythonEnvUtils.getGatewayServer() == null) {
                Py4JJavaServer py4JJavaServer = null;
                try {
                    GatewayServer startGatewayServer = PythonEnvUtils.startGatewayServer();
                    PythonEnvUtils.setGatewayServer(startGatewayServer);
                    ArrayList arrayList = new ArrayList();
                    arrayList.add("-m");
                    arrayList.add("pyflink.pyflink_callback_server");
                    Process launchPy4jPythonClient = PythonEnvUtils.launchPy4jPythonClient(startGatewayServer, readableConfig, arrayList, null, System.getProperty("java.io.tmpdir") + File.separator + "pyflink" + File.separator + UUID.randomUUID(), false);
                    map = (Map) startGatewayServer.getGateway().getEntryPoint();
                    int i = 0;
                    while (!map.containsKey("PythonFunctionFactory")) {
                        if (!launchPy4jPythonClient.isAlive()) {
                            throw new RuntimeException("Python callback server start failed!");
                        }
                        try {
                            Thread.sleep(100L);
                            i++;
                            if (i > 100) {
                                throw new RuntimeException("Python callback server start failed!");
                            }
                        } catch (InterruptedException e) {
                            throw new RuntimeException("Interrupted while waiting for the python process to start.", e);
                        }
                    }
                    Runtime.getRuntime().addShutdownHook(new PythonProcessShutdownHook(launchPy4jPythonClient));
                } catch (Throwable th) {
                    try {
                        PythonEnvUtils.setGatewayServer(null);
                        if (0 != 0) {
                            py4JJavaServer.shutdown();
                        }
                    } catch (Throwable th2) {
                    }
                    if (0 != 0) {
                        try {
                            shutdownPythonProcess(null, TIMEOUT_MILLIS);
                        } catch (Throwable th3) {
                            throw th;
                        }
                    }
                    throw th;
                }
            } else {
                map = (Map) PythonEnvUtils.getGatewayServer().getGateway().getEntryPoint();
            }
            PythonFunctionFactory pythonFunctionFactory = (PythonFunctionFactory) map.get("PythonFunctionFactory");
            PYTHON_FUNCTION_FACTORY_REF.set(pythonFunctionFactory);
            return pythonFunctionFactory;
        }
    }

    static void shutdownPythonProcess(Process process, long j) {
        process.destroy();
        try {
            process.waitFor(j, TimeUnit.MILLISECONDS);
            if (process.isAlive()) {
                process.destroyForcibly();
            }
        } catch (InterruptedException e) {
            throw new RuntimeException("Interrupt while waiting for the python process to stop.", e);
        }
    }
}
