package org.apache.flink.streaming.api.operators.python.process;

import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.python.PythonFunctionRunner;
import org.apache.flink.python.env.PythonDependencyInfo;
import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
import org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator;
import org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner;
import org.apache.flink.table.functions.python.PythonEnv;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/operators/python/process/AbstractExternalPythonFunctionOperator.class */
public abstract class AbstractExternalPythonFunctionOperator<OUT> extends AbstractPythonFunctionOperator<OUT> {
    protected transient PythonFunctionRunner pythonFunctionRunner;
    private transient ExecutorService flushThreadPool;

    public AbstractExternalPythonFunctionOperator(Configuration configuration) {
        super(configuration);
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void open() throws Exception {
        super.open();
        this.pythonFunctionRunner = createPythonFunctionRunner();
        this.pythonFunctionRunner.open(this.config);
        this.flushThreadPool = Executors.newSingleThreadExecutor();
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void close() throws Exception {
        try {
            if (this.pythonFunctionRunner != null) {
                this.pythonFunctionRunner.close();
                this.pythonFunctionRunner = null;
            }
            if (this.flushThreadPool != null) {
                this.flushThreadPool.shutdown();
                this.flushThreadPool = null;
            }
        } finally {
            super.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void invokeFinishBundle() throws Exception {
        if (this.elementCount > 0) {
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            AtomicReference atomicReference = new AtomicReference();
            this.flushThreadPool.submit(() -> {
                try {
                    try {
                        this.pythonFunctionRunner.flush();
                        atomicBoolean.set(true);
                        ((BeamPythonFunctionRunner) this.pythonFunctionRunner).notifyNoMoreResults();
                    } catch (Throwable th) {
                        atomicReference.set(th);
                        atomicBoolean.set(true);
                        ((BeamPythonFunctionRunner) this.pythonFunctionRunner).notifyNoMoreResults();
                    }
                } catch (Throwable th2) {
                    atomicBoolean.set(true);
                    ((BeamPythonFunctionRunner) this.pythonFunctionRunner).notifyNoMoreResults();
                    throw th2;
                }
            });
            while (!atomicBoolean.get()) {
                Tuple3<String, byte[], Integer> takeResult = this.pythonFunctionRunner.takeResult();
                if (((Integer) takeResult.f2).intValue() != 0) {
                    emitResult(takeResult);
                    emitResults();
                }
            }
            emitResults();
            Throwable th = (Throwable) atomicReference.get();
            if (th != null) {
                throw new RuntimeException("Error while waiting for BeamPythonFunctionRunner flush", th);
            }
            this.elementCount = 0;
            this.lastFinishBundleTime = getProcessingTimeService().getCurrentProcessingTime();
            if (this.bundleFinishedCallback != null) {
                this.bundleFinishedCallback.run();
                this.bundleFinishedCallback = null;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public ProcessPythonEnvironmentManager createPythonEnvironmentManager() {
        PythonDependencyInfo create = PythonDependencyInfo.create(this.config, getRuntimeContext().getDistributedCache());
        PythonEnv pythonEnv = getPythonEnv();
        if (pythonEnv.getExecType() == PythonEnv.ExecType.PROCESS) {
            return new ProcessPythonEnvironmentManager(create, getContainingTask().getEnvironment().getTaskManagerInfo().getTmpDirectories(), this.systemEnvEnabled ? new HashMap(System.getenv()) : new HashMap(), getRuntimeContext().getJobInfo().getJobId());
        }
        throw new UnsupportedOperationException(String.format("Execution type '%s' is not supported.", pythonEnv.getExecType()));
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    protected void drainUnregisteredTimers() {
        this.pythonFunctionRunner.drainUnregisteredTimers();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void emitResults() throws Exception {
        while (true) {
            Tuple3<String, byte[], Integer> pollResult = this.pythonFunctionRunner.pollResult();
            if (pollResult == null || ((Integer) pollResult.f2).intValue() == 0) {
                return;
            } else {
                emitResult(pollResult);
            }
        }
    }

    public abstract PythonEnv getPythonEnv();

    public abstract void emitResult(Tuple3<String, byte[], Integer> tuple3) throws Exception;

    public abstract PythonFunctionRunner createPythonFunctionRunner() throws Exception;
}
