package com.hazelcast.jet.grpc.impl;

import com.hazelcast.function.FunctionEx;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.grpc.GrpcProperties;
import com.hazelcast.jet.grpc.GrpcService;
import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.properties.HazelcastProperties;
import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;

/* loaded from: input_file:com/hazelcast/jet/grpc/impl/BidirectionalStreamingService.class */
public final class BidirectionalStreamingService<T, R> implements GrpcService<T, R> {
    private final StreamObserver<T> sink;
    private final Queue<CompletableFuture<R>> futureQueue = new ConcurrentLinkedQueue();
    private final CountDownLatch completionLatch = new CountDownLatch(1);
    private final ILogger logger;
    private final ManagedChannel channel;
    private volatile Throwable exceptionInOutputObserver;
    private final long destroyTimeout;
    private final long shutdownTimeout;

    /* loaded from: input_file:com/hazelcast/jet/grpc/impl/BidirectionalStreamingService$OutputMessageObserver.class */
    private class OutputMessageObserver implements StreamObserver<R> {
        private OutputMessageObserver() {
        }

        public void onNext(R r) {
            try {
                ((CompletableFuture) BidirectionalStreamingService.this.futureQueue.remove()).complete(r);
            } catch (Throwable th) {
                BidirectionalStreamingService.this.exceptionInOutputObserver = th;
                BidirectionalStreamingService.this.completionLatch.countDown();
            }
        }

        public void onError(Throwable th) {
            try {
                Throwable translateGrpcException = GrpcUtil.translateGrpcException(th);
                BidirectionalStreamingService.this.exceptionInOutputObserver = translateGrpcException;
                while (true) {
                    CompletableFuture completableFuture = (CompletableFuture) BidirectionalStreamingService.this.futureQueue.poll();
                    if (completableFuture == null) {
                        return;
                    } else {
                        completableFuture.completeExceptionally(translateGrpcException);
                    }
                }
            } finally {
                BidirectionalStreamingService.this.completionLatch.countDown();
            }
        }

        public void onCompleted() {
            while (true) {
                CompletableFuture completableFuture = (CompletableFuture) BidirectionalStreamingService.this.futureQueue.poll();
                if (completableFuture == null) {
                    BidirectionalStreamingService.this.completionLatch.countDown();
                    return;
                }
                completableFuture.completeExceptionally(new JetException("Completion signaled before the future was completed"));
            }
        }
    }

    public BidirectionalStreamingService(@Nonnull Processor.Context context, @Nonnull ManagedChannel managedChannel, @Nonnull FunctionEx<? super ManagedChannel, ? extends FunctionEx<StreamObserver<R>, StreamObserver<T>>> functionEx) {
        this.logger = context.logger();
        this.channel = managedChannel;
        this.sink = (StreamObserver) ((FunctionEx) functionEx.apply(managedChannel)).apply(new OutputMessageObserver());
        HazelcastProperties hazelcastProperties = new HazelcastProperties(context.hazelcastInstance().getConfig().getProperties());
        this.destroyTimeout = hazelcastProperties.getSeconds(GrpcProperties.DESTROY_TIMEOUT);
        this.shutdownTimeout = hazelcastProperties.getSeconds(GrpcProperties.SHUTDOWN_TIMEOUT);
    }

    @Override // com.hazelcast.jet.grpc.GrpcService
    @Nonnull
    public CompletableFuture<R> call(@Nonnull T t) {
        checkForServerError();
        CompletableFuture<R> completableFuture = new CompletableFuture<>();
        this.futureQueue.add(completableFuture);
        this.sink.onNext(t);
        return completableFuture;
    }

    private void checkForServerError() {
        if (this.completionLatch.getCount() == 0) {
            throw new JetException("Exception in gRPC service: " + this.exceptionInOutputObserver, this.exceptionInOutputObserver);
        }
    }

    public void destroy() throws InterruptedException {
        this.sink.onCompleted();
        if (!this.completionLatch.await(this.destroyTimeout, TimeUnit.SECONDS)) {
            this.logger.info("gRPC call has not completed on time");
        }
        GrpcUtil.shutdownChannel(this.channel, this.logger, this.shutdownTimeout);
    }
}
