/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.execution.python;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.HashMap;
import net.razorvine.pickle.Pickler;
import org.apache.spark.JobArtifactSet$;
import org.apache.spark.SparkEnv;
import org.apache.spark.SparkEnv$;
import org.apache.spark.SparkException;
import org.apache.spark.api.python.PythonFunction;
import org.apache.spark.api.python.PythonWorker;
import org.apache.spark.api.python.PythonWorkerUtils$;
import org.apache.spark.api.python.SpecialLengths$;
import org.apache.spark.internal.config.Python$;
import org.apache.spark.internal.config.package$;
import org.apache.spark.sql.execution.python.EvaluatePython$;
import org.apache.spark.sql.internal.SQLConf$;
import org.apache.spark.util.AccumulatorV2;
import org.apache.spark.util.DirectByteBufferOutputStream;
import scala.;
import scala.$less$colon$less$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.ArrayOps$;
import scala.collection.immutable.Seq;
import scala.collection.immutable.Set;
import scala.jdk.CollectionConverters$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

@ScalaSignature(bytes="\u0006\u0005\u0005-b!\u0002\b\u0010\u0003\u0003a\u0002\u0002\u0003\u0013\u0001\u0005\u0003\u0005\u000b\u0011B\u0013\t\u000b1\u0002A\u0011A\u0017\t\u000fq\u0002!\u0019!D\t{!)\u0011\n\u0001D\t\u0015\")A\r\u0001D\tK\")1\u000e\u0001C\u0001Y\u001a!Q\u000e\u0001\u0003o\u0011!\u0011xA!A!\u0002\u0013\u0019\b\u0002\u0003<\b\u0005\u0003\u0005\u000b\u0011B<\t\u000b1:A\u0011A?\t\u0011\u0005\u0015q\u0001)A\u0005\u0003\u000fAq!a\u0005\b\t\u0003\n)\u0002C\u0004\u0002\u0014\u001d!\t%!\b\u0003'AKH\u000f[8o!2\fgN\\3s%Vtg.\u001a:\u000b\u0005A\t\u0012A\u00029zi\"|gN\u0003\u0002\u0013'\u0005IQ\r_3dkRLwN\u001c\u0006\u0003)U\t1a]9m\u0015\t1r#A\u0003ta\u0006\u00148N\u0003\u0002\u00193\u00051\u0011\r]1dQ\u0016T\u0011AG\u0001\u0004_J<7\u0001A\u000b\u0003;I\u001a\"\u0001\u0001\u0010\u0011\u0005}\u0011S\"\u0001\u0011\u000b\u0003\u0005\nQa]2bY\u0006L!a\t\u0011\u0003\r\u0005s\u0017PU3g\u0003\u00111WO\\2\u0011\u0005\u0019RS\"A\u0014\u000b\u0005AA#BA\u0015\u0016\u0003\r\t\u0007/[\u0005\u0003W\u001d\u0012a\u0002U=uQ>tg)\u001e8di&|g.\u0001\u0004=S:LGO\u0010\u000b\u0003]m\u00022a\f\u00011\u001b\u0005y\u0001CA\u00193\u0019\u0001!Qa\r\u0001C\u0002Q\u0012\u0011\u0001V\t\u0003ka\u0002\"a\b\u001c\n\u0005]\u0002#a\u0002(pi\"Lgn\u001a\t\u0003?eJ!A\u000f\u0011\u0003\u0007\u0005s\u0017\u0010C\u0003%\u0005\u0001\u0007Q%\u0001\u0007x_J\\WM]'pIVdW-F\u0001?!\tydI\u0004\u0002A\tB\u0011\u0011\tI\u0007\u0002\u0005*\u00111iG\u0001\u0007yI|w\u000e\u001e \n\u0005\u0015\u0003\u0013A\u0002)sK\u0012,g-\u0003\u0002H\u0011\n11\u000b\u001e:j]\u001eT!!\u0012\u0011\u0002\u001b]\u0014\u0018\u000e^3U_BKH\u000f[8o)\rYe\n\u0017\t\u0003?1K!!\u0014\u0011\u0003\tUs\u0017\u000e\u001e\u0005\u0006\u001f\u0012\u0001\r\u0001U\u0001\bI\u0006$\u0018mT;u!\t\tf+D\u0001S\u0015\t\u0019F+\u0001\u0002j_*\tQ+\u0001\u0003kCZ\f\u0017BA,S\u0005A!\u0015\r^1PkR\u0004X\u000f^*ue\u0016\fW\u000eC\u0003Z\t\u0001\u0007!,A\u0004qS\u000e\\G.\u001a:\u0011\u0005m\u0013W\"\u0001/\u000b\u0005us\u0016A\u00029jG.dWM\u0003\u0002`A\u0006I!/\u0019>peZLg.\u001a\u0006\u0002C\u0006\u0019a.\u001a;\n\u0005\rd&a\u0002)jG.dWM]\u0001\u0012e\u0016\u001cW-\u001b<f\rJ|W\u000eU=uQ>tGC\u0001\u0019g\u0011\u00159W\u00011\u0001i\u0003\u0019!\u0017\r^1J]B\u0011\u0011+[\u0005\u0003UJ\u0013q\u0002R1uC&s\u0007/\u001e;TiJ,\u0017-\\\u0001\feVt\u0017J\u001c)zi\"|g\u000eF\u00011\u0005E9vN]6fe&s\u0007/\u001e;TiJ,\u0017-\\\n\u0003\u000f=\u0004\"!\u00159\n\u0005E\u0014&aC%oaV$8\u000b\u001e:fC6\faa^8sW\u0016\u0014\bC\u0001\u0014u\u0013\t)xE\u0001\u0007QsRDwN\\,pe.,'/\u0001\u0004ck\u001a4WM\u001d\t\u0003qnl\u0011!\u001f\u0006\u0003uR\u000b1A\\5p\u0013\ta\u0018P\u0001\u0006CsR,')\u001e4gKJ$RA`A\u0001\u0003\u0007\u0001\"a`\u0004\u000e\u0003\u0001AQA\u001d\u0006A\u0002MDQA\u001e\u0006A\u0002]\fA\u0001^3naB)q$!\u0003\u0002\u000e%\u0019\u00111\u0002\u0011\u0003\u000b\u0005\u0013(/Y=\u0011\u0007}\ty!C\u0002\u0002\u0012\u0001\u0012AAQ=uK\u0006!!/Z1e)\t\t9\u0002E\u0002 \u00033I1!a\u0007!\u0005\rIe\u000e\u001e\u000b\t\u0003/\ty\"a\t\u0002(!9\u0011\u0011E\u0007A\u0002\u0005\u001d\u0011!\u00012\t\u000f\u0005\u0015R\u00021\u0001\u0002\u0018\u0005\u0019qN\u001a4\t\u000f\u0005%R\u00021\u0001\u0002\u0018\u0005\u0019A.\u001a8")
public abstract class PythonPlannerRunner<T> {
    private final PythonFunction func;

    public abstract String workerModule();

    public abstract void writeToPython(DataOutputStream var1, Pickler var2);

    public abstract T receiveFromPython(DataInputStream var1);

    public T runInPython() {
        T t;
        PythonWorker worker;
        SparkEnv env = SparkEnv$.MODULE$.get();
        int bufferSize = BoxesRunTime.unboxToInt((Object)env.conf().get(package$.MODULE$.BUFFER_SIZE()));
        long authSocketTimeout = BoxesRunTime.unboxToLong((Object)env.conf().get(Python$.MODULE$.PYTHON_AUTH_SOCKET_TIMEOUT()));
        boolean reuseWorker = BoxesRunTime.unboxToBoolean((Object)env.conf().get(Python$.MODULE$.PYTHON_WORKER_REUSE()));
        String localdir = Predef$.MODULE$.wrapRefArray((Object[])ArrayOps$.MODULE$.map$extension(Predef$.MODULE$.refArrayOps((Object[])env.blockManager().diskBlockManager().localDirs()), (Function1 & Serializable)f -> f.getPath(), ClassTag$.MODULE$.apply(String.class))).mkString(",");
        boolean simplifiedTraceback = SQLConf$.MODULE$.get().pysparkSimplifiedTraceback();
        Option workerMemoryMb = SQLConf$.MODULE$.get().pythonPlannerExecMemory();
        Option jobArtifactUUID = JobArtifactSet$.MODULE$.getCurrentJobArtifactState().map((Function1 & Serializable)x$1 -> x$1.uuid());
        HashMap<String, Object> envVars = new HashMap<String, Object>(this.func.envVars());
        String pythonExec = this.func.pythonExec();
        String pythonVer = this.func.pythonVer();
        Set pythonIncludes = CollectionConverters$.MODULE$.ListHasAsScala(this.func.pythonIncludes()).asScala().toSet();
        Seq broadcastVars = CollectionConverters$.MODULE$.ListHasAsScala(this.func.broadcastVars()).asScala().toSeq();
        Option maybeAccumulator = Option$.MODULE$.apply((Object)this.func.accumulator()).map((Function1 & Serializable)x$2 -> x$2.copyAndReset());
        envVars.put("SPARK_LOCAL_DIRS", localdir);
        Object object = reuseWorker ? envVars.put("SPARK_REUSE_WORKER", "1") : BoxedUnit.UNIT;
        Object object2 = simplifiedTraceback ? envVars.put("SPARK_SIMPLIFIED_TRACEBACK", "1") : BoxedUnit.UNIT;
        workerMemoryMb.foreach((Function1 & Serializable)memoryMb -> envVars.put("PYSPARK_PLANNER_MEMORY_MB", Long.toString(BoxesRunTime.unboxToLong((Object)memoryMb))));
        envVars.put("SPARK_AUTH_SOCKET_TIMEOUT", Long.toString(authSocketTimeout));
        envVars.put("SPARK_BUFFER_SIZE", Integer.toString(bufferSize));
        envVars.put("SPARK_JOB_ARTIFACT_UUID", jobArtifactUUID.getOrElse((Function0 & Serializable)() -> "default"));
        EvaluatePython$.MODULE$.registerPicklers();
        Pickler pickler = new Pickler(true, false);
        Tuple2 tuple2 = env.createPythonWorker(pythonExec, this.workerModule(), CollectionConverters$.MODULE$.MapHasAsScala(envVars).asScala().toMap((.less.colon.less)$less$colon$less$.MODULE$.refl()));
        if (tuple2 == null || (worker = (PythonWorker)tuple2._1()) == null) {
            throw new MatchError((Object)tuple2);
        }
        PythonWorker pythonWorker = worker;
        PythonWorker worker2 = pythonWorker;
        boolean releasedOrClosed = false;
        DirectByteBufferOutputStream bufferStream = new DirectByteBufferOutputStream();
        try {
            try {
                DataOutputStream dataOut = new DataOutputStream(new BufferedOutputStream((OutputStream)bufferStream, bufferSize));
                PythonWorkerUtils$.MODULE$.writePythonVersion(pythonVer, dataOut);
                PythonWorkerUtils$.MODULE$.writeSparkFiles(jobArtifactUUID, pythonIncludes, dataOut);
                PythonWorkerUtils$.MODULE$.writeBroadcasts(broadcastVars, worker2, env, dataOut);
                this.writeToPython(dataOut, pickler);
                dataOut.writeInt(SpecialLengths$.MODULE$.END_OF_STREAM());
                dataOut.flush();
                DataInputStream dataIn = new DataInputStream(new BufferedInputStream(new WorkerInputStream(this, worker2, bufferStream.toByteBuffer()), bufferSize));
                T res = this.receiveFromPython(dataIn);
                PythonWorkerUtils$.MODULE$.receiveAccumulatorUpdates(maybeAccumulator, dataIn);
                Option$.MODULE$.apply((Object)this.func.accumulator()).foreach((Function1 & Serializable)x$3 -> {
                    x$3.merge((AccumulatorV2)maybeAccumulator.get());
                    return BoxedUnit.UNIT;
                });
                int n = dataIn.readInt();
                if (SpecialLengths$.MODULE$.END_OF_STREAM() == n && reuseWorker) {
                    env.releasePythonWorker(pythonExec, this.workerModule(), CollectionConverters$.MODULE$.MapHasAsScala(envVars).asScala().toMap((.less.colon.less)$less$colon$less$.MODULE$.refl()), worker2);
                } else {
                    env.destroyPythonWorker(pythonExec, this.workerModule(), CollectionConverters$.MODULE$.MapHasAsScala(envVars).asScala().toMap((.less.colon.less)$less$colon$less$.MODULE$.refl()), worker2);
                }
                releasedOrClosed = true;
                t = res;
            }
            catch (EOFException eof) {
                throw new SparkException("Python worker exited unexpectedly (crashed)", (Throwable)eof);
            }
        }
        catch (Throwable throwable) {
            try {
                bufferStream.close();
            }
            finally {
                if (!releasedOrClosed) {
                    env.destroyPythonWorker(pythonExec, this.workerModule(), CollectionConverters$.MODULE$.MapHasAsScala(envVars).asScala().toMap((.less.colon.less)$less$colon$less$.MODULE$.refl()), worker2);
                }
            }
            throw throwable;
        }
        T t2 = t;
        try {
            bufferStream.close();
        }
        finally {
            if (!releasedOrClosed) {
                env.destroyPythonWorker(pythonExec, this.workerModule(), CollectionConverters$.MODULE$.MapHasAsScala(envVars).asScala().toMap((.less.colon.less)$less$colon$less$.MODULE$.refl()), worker2);
            }
        }
        return t2;
    }

    public PythonPlannerRunner(PythonFunction func) {
        this.func = func;
    }

    private class WorkerInputStream
    extends InputStream {
        private final PythonWorker worker;
        private final ByteBuffer buffer;
        private final byte[] temp;
        public final /* synthetic */ PythonPlannerRunner $outer;

        @Override
        public int read() {
            int n = this.read(this.temp);
            if (n <= 0) {
                return -1;
            }
            return this.temp[0] & 0xFF;
        }

        @Override
        public int read(byte[] b, int off, int len) {
            ByteBuffer buf = ByteBuffer.wrap(b, off, len);
            int n = 0;
            while (n == 0) {
                Object object;
                this.worker.selector().select();
                if (this.worker.selectionKey().isReadable()) {
                    n = this.worker.channel().read(buf);
                }
                if (this.worker.selectionKey().isWritable()) {
                    boolean acceptsInput = true;
                    while (acceptsInput && this.buffer.hasRemaining()) {
                        int n2 = this.worker.channel().write(this.buffer);
                        acceptsInput = n2 > 0;
                    }
                    if (!this.buffer.hasRemaining()) {
                        object = this.worker.selectionKey().interestOps(1);
                        continue;
                    }
                    object = BoxedUnit.UNIT;
                    continue;
                }
                object = BoxedUnit.UNIT;
            }
            return n;
        }

        public /* synthetic */ PythonPlannerRunner org$apache$spark$sql$execution$python$PythonPlannerRunner$WorkerInputStream$$$outer() {
            return this.$outer;
        }

        public WorkerInputStream(PythonPlannerRunner $outer, PythonWorker worker, ByteBuffer buffer) {
            this.worker = worker;
            this.buffer = buffer;
            if ($outer == null) {
                throw null;
            }
            this.$outer = $outer;
            this.temp = new byte[1];
        }
    }
}

