package com.hazelcast.jet.python;

import com.hazelcast.jet.JetException;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.grpc.impl.GrpcUtil;
import com.hazelcast.jet.pipeline.ServiceFactory;
import com.hazelcast.jet.python.impl.grpc.InputMessage;
import com.hazelcast.jet.python.impl.grpc.JetToPythonGrpc;
import com.hazelcast.jet.python.impl.grpc.OutputMessage;
import com.hazelcast.logging.ILogger;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.io.File;
import java.lang.invoke.SerializedLambda;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/hazelcast/jet/python/PythonService.class */
public final class PythonService {
    private static final int CREATE_CONTEXT_RETRY_COUNT = 2;
    private static final int CREATE_CONTEXT_RETRY_SLEEP_TIME_MILLIS = 1000;
    private static final String JET_TO_PYTHON_PREFIX = "jet_to_python_";
    static final String MAIN_SHELL_SCRIPT = "jet_to_python_main.sh";
    private final ILogger logger;
    private final JetToPythonServer server;
    private final ManagedChannel chan;
    private final StreamObserver<InputMessage> sink;
    private final Queue<CompletableFuture<List<String>>> futureQueue = new ConcurrentLinkedQueue();
    private final CountDownLatch completionLatch = new CountDownLatch(1);
    private volatile Throwable exceptionInOutputObserver;

    /* loaded from: input_file:com/hazelcast/jet/python/PythonService$OutputMessageObserver.class */
    private class OutputMessageObserver implements StreamObserver<OutputMessage> {
        private OutputMessageObserver() {
        }

        public void onNext(OutputMessage outputMessage) {
            try {
                ((CompletableFuture) PythonService.this.futureQueue.remove()).complete(outputMessage.mo64getOutputValueList());
            } catch (Throwable th) {
                PythonService.this.exceptionInOutputObserver = th;
                PythonService.this.completionLatch.countDown();
            }
        }

        public void onError(Throwable th) {
            try {
                Throwable translateGrpcException = GrpcUtil.translateGrpcException(th);
                PythonService.this.exceptionInOutputObserver = translateGrpcException;
                while (true) {
                    CompletableFuture completableFuture = (CompletableFuture) PythonService.this.futureQueue.poll();
                    if (completableFuture == null) {
                        return;
                    } else {
                        completableFuture.completeExceptionally(translateGrpcException);
                    }
                }
            } finally {
                PythonService.this.completionLatch.countDown();
            }
        }

        public void onCompleted() {
            while (true) {
                CompletableFuture completableFuture = (CompletableFuture) PythonService.this.futureQueue.poll();
                if (completableFuture == null) {
                    PythonService.this.completionLatch.countDown();
                    return;
                }
                completableFuture.completeExceptionally(new JetException("Completion signaled before the future was completed"));
            }
        }
    }

    PythonService(PythonServiceContext pythonServiceContext) {
        this.logger = pythonServiceContext.logger();
        this.server = new JetToPythonServer(pythonServiceContext.runtimeBaseDir(), this.logger);
        try {
            this.chan = ((ManagedChannelBuilder) pythonServiceContext.channelFn().apply("127.0.0.1", Integer.valueOf(this.server.start()))).usePlaintext().build();
            this.sink = JetToPythonGrpc.newStub(this.chan).streamingCall(new OutputMessageObserver());
        } catch (Throwable th) {
            this.server.stop();
            throw new JetException("PythonService initialization failed", th);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Nonnull
    public static ServiceFactory<?, PythonService> factory(@Nonnull PythonServiceConfig pythonServiceConfig) {
        pythonServiceConfig.validate();
        ServiceFactory withDestroyServiceFn = ServiceFactory.withCreateContextFn(context -> {
            return createContextWithRetry(context, pythonServiceConfig);
        }).withDestroyContextFn((v0) -> {
            v0.destroy();
        }).withCreateServiceFn((context2, pythonServiceContext) -> {
            return new PythonService(pythonServiceContext);
        }).withDestroyServiceFn((v0) -> {
            v0.destroy();
        });
        if (pythonServiceConfig.baseDir() != null) {
            File file = (File) Objects.requireNonNull(pythonServiceConfig.baseDir());
            return withDestroyServiceFn.withAttachedDirectory(file.toString(), file);
        }
        File file2 = (File) Objects.requireNonNull(pythonServiceConfig.handlerFile());
        return withDestroyServiceFn.withAttachedFile(file2.toString(), file2);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static PythonServiceContext createContextWithRetry(ProcessorSupplier.Context context, PythonServiceConfig pythonServiceConfig) {
        JetException jetException = null;
        int i = CREATE_CONTEXT_RETRY_COUNT;
        while (i >= 0) {
            try {
                return new PythonServiceContext(context, pythonServiceConfig);
            } catch (JetException e) {
                jetException = e;
                context.logger().warning("PythonService context creation failed, " + (i > 0 ? "will retry" : "giving up"), e);
                try {
                    Thread.sleep(1000L);
                    i--;
                } catch (InterruptedException e2) {
                    Thread.currentThread().interrupt();
                    throw new JetException(e2);
                }
            }
        }
        throw jetException;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<List<String>> sendRequest(List<String> list) {
        checkForServerError();
        InputMessage.Builder newBuilder = InputMessage.newBuilder();
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            newBuilder.addInputValue(it.next());
        }
        CompletableFuture<List<String>> completableFuture = new CompletableFuture<>();
        this.futureQueue.add(completableFuture);
        this.sink.onNext(newBuilder.m42build());
        return completableFuture;
    }

    private void checkForServerError() {
        if (this.completionLatch.getCount() == 0) {
            throw new JetException("PythonService broke down: " + this.exceptionInOutputObserver, this.exceptionInOutputObserver);
        }
    }

    @SuppressFBWarnings({"RV_RETURN_VALUE_IGNORED"})
    void destroy() {
        boolean interrupted = Thread.interrupted();
        try {
            try {
                this.sink.onCompleted();
                if (!this.completionLatch.await(1L, TimeUnit.SECONDS)) {
                    this.logger.info("gRPC call has not completed on time");
                }
                GrpcUtil.shutdownChannel(this.chan, this.logger, 1L);
                this.server.stop();
                if (interrupted) {
                    Thread.currentThread().interrupt();
                }
            } catch (Exception e) {
                throw new JetException("PythonService.destroy() failed: " + e, e);
            }
        } catch (Throwable th) {
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
            throw th;
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 113605521:
                if (implMethodName.equals("lambda$factory$9a0f8f23$1")) {
                    z = false;
                    break;
                }
                break;
            case 356921353:
                if (implMethodName.equals("lambda$factory$f8fa7483$1")) {
                    z = CREATE_CONTEXT_RETRY_COUNT;
                    break;
                }
                break;
            case 1557372922:
                if (implMethodName.equals("destroy")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/python/PythonService") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/jet/python/PythonServiceConfig;Lcom/hazelcast/jet/core/ProcessorSupplier$Context;)Lcom/hazelcast/jet/python/PythonServiceContext;")) {
                    PythonServiceConfig pythonServiceConfig = (PythonServiceConfig) serializedLambda.getCapturedArg(0);
                    return context -> {
                        return createContextWithRetry(context, pythonServiceConfig);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/ConsumerEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("acceptEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("com/hazelcast/jet/python/PythonServiceContext") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    return (v0) -> {
                        v0.destroy();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/ConsumerEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("acceptEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("com/hazelcast/jet/python/PythonService") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    return (v0) -> {
                        v0.destroy();
                    };
                }
                break;
            case CREATE_CONTEXT_RETRY_COUNT /* 2 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/BiFunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/python/PythonService") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/jet/core/Processor$Context;Lcom/hazelcast/jet/python/PythonServiceContext;)Lcom/hazelcast/jet/python/PythonService;")) {
                    return (context2, pythonServiceContext) -> {
                        return new PythonService(pythonServiceContext);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
