package org.apache.flink.streaming.api.operators.python.embedded;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.python.env.AbstractPythonEnvironmentManager;
import org.apache.flink.python.env.PythonDependencyInfo;
import org.apache.flink.python.env.embedded.EmbeddedPythonEnvironment;
import org.apache.flink.python.env.embedded.EmbeddedPythonEnvironmentManager;
import org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator;
import pemja.core.PythonInterpreter;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/operators/python/embedded/AbstractEmbeddedPythonFunctionOperator.class */
public abstract class AbstractEmbeddedPythonFunctionOperator<OUT> extends AbstractPythonFunctionOperator<OUT> {
    private static final long serialVersionUID = 1;
    private static final ReentrantLock lock = new ReentrantLock();
    private static final Map<JobID, Tuple2<String, Integer>> workingDirectories = new HashMap();
    private transient EmbeddedPythonEnvironmentManager pythonEnvironmentManager;
    protected transient PythonInterpreter interpreter;

    public AbstractEmbeddedPythonFunctionOperator(Configuration configuration) {
        super(configuration);
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void open() throws Exception {
        Tuple2<String, Integer> of;
        super.open();
        this.pythonEnvironmentManager = createPythonEnvironmentManager();
        this.pythonEnvironmentManager.open();
        EmbeddedPythonEnvironment embeddedPythonEnvironment = (EmbeddedPythonEnvironment) this.pythonEnvironmentManager.createEnvironment();
        this.interpreter = new PythonInterpreter(embeddedPythonEnvironment.getConfig());
        Map<String, String> env = embeddedPythonEnvironment.getEnv();
        if (env.containsKey(AbstractPythonEnvironmentManager.PYTHON_WORKING_DIR)) {
            lock.lockInterruptibly();
            try {
                JobID jobId = getRuntimeContext().getJobInfo().getJobId();
                if (workingDirectories.containsKey(jobId)) {
                    of = workingDirectories.get(jobId);
                } else {
                    of = Tuple2.of((Object) null, 0);
                    workingDirectories.put(jobId, of);
                }
                Tuple2<String, Integer> tuple2 = of;
                tuple2.f1 = Integer.valueOf(((Integer) tuple2.f1).intValue() + 1);
                if (of.f0 == null) {
                    this.interpreter.exec("import os;cwd = os.getcwd();");
                    of.f0 = this.interpreter.get("cwd", String.class);
                    this.interpreter.exec(String.format("import os;os.chdir('%s')", env.get(AbstractPythonEnvironmentManager.PYTHON_WORKING_DIR)));
                }
                lock.unlock();
            } catch (Throwable th) {
                lock.unlock();
                throw th;
            }
        }
        openPythonInterpreter();
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void close() throws Exception {
        try {
            JobID jobId = getRuntimeContext().getJobInfo().getJobId();
            if (workingDirectories.containsKey(jobId)) {
                lock.lockInterruptibly();
                try {
                    Tuple2<String, Integer> tuple2 = workingDirectories.get(jobId);
                    tuple2.f1 = Integer.valueOf(((Integer) tuple2.f1).intValue() - 1);
                    if (((Integer) tuple2.f1).intValue() == 0) {
                        this.interpreter.exec(String.format("import os;os.chdir('%s')", tuple2.f0));
                    }
                    lock.unlock();
                } catch (Throwable th) {
                    lock.unlock();
                    throw th;
                }
            }
            if (this.interpreter != null) {
                this.interpreter.close();
            }
            if (this.pythonEnvironmentManager != null) {
                this.pythonEnvironmentManager.close();
            }
        } finally {
            super.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public EmbeddedPythonEnvironmentManager createPythonEnvironmentManager() {
        return new EmbeddedPythonEnvironmentManager(PythonDependencyInfo.create(this.config, getRuntimeContext().getDistributedCache()), getContainingTask().getEnvironment().getTaskManagerInfo().getTmpDirectories(), this.systemEnvEnabled ? new HashMap(System.getenv()) : new HashMap(), getRuntimeContext().getJobInfo().getJobId());
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    protected void invokeFinishBundle() throws Exception {
    }

    public abstract void openPythonInterpreter();
}
