/*
 * Decompiled with CFR 0.152.
 */
package alluxio.master.transport;

import alluxio.grpc.MessagingServiceGrpc;
import alluxio.grpc.TransportMessage;
import alluxio.master.transport.GrpcMessagingConnection;
import alluxio.master.transport.GrpcMessagingContext;
import alluxio.master.transport.GrpcMessagingServerConnection;
import alluxio.security.authentication.ClientIpAddressInjector;
import com.google.common.base.MoreObjects;
import io.grpc.stub.StreamObserver;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GrpcMessagingServiceClientHandler
extends MessagingServiceGrpc.MessagingServiceImplBase {
    private static final Logger LOG = LoggerFactory.getLogger(GrpcMessagingServiceClientHandler.class);
    private final Consumer<GrpcMessagingConnection> mListener;
    private final GrpcMessagingContext mContext;
    private final long mRequestTimeoutMs;
    private final ExecutorService mExecutor;
    private final InetSocketAddress mServerAddress;

    public GrpcMessagingServiceClientHandler(InetSocketAddress serverAddress, Consumer<GrpcMessagingConnection> listener, GrpcMessagingContext context, ExecutorService executor, long requestTimeoutMs) {
        this.mServerAddress = serverAddress;
        this.mListener = listener;
        this.mContext = context;
        this.mExecutor = executor;
        this.mRequestTimeoutMs = requestTimeoutMs;
    }

    public StreamObserver<TransportMessage> connect(StreamObserver<TransportMessage> responseObserver) {
        String transportId = MoreObjects.toStringHelper((Object)((Object)this)).add("ServerAddress", (Object)this.mServerAddress).add("ClientAddress", (Object)ClientIpAddressInjector.getIpAddress()).toString();
        LOG.debug("Creating a messaging server connection: {}", (Object)transportId);
        GrpcMessagingServerConnection serverConnection = new GrpcMessagingServerConnection(transportId, this.mContext, this.mExecutor, this.mRequestTimeoutMs);
        serverConnection.setTargetObserver(responseObserver);
        LOG.debug("Created a messaging server connection: {}", (Object)serverConnection);
        try {
            this.mContext.execute(() -> this.mListener.accept(serverConnection)).get();
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Interrupted while waiting for server to register new connection.");
        }
        catch (ExecutionException ee) {
            throw new RuntimeException("Failed to register new connection with server", ee.getCause());
        }
        return serverConnection;
    }
}

