/*
 * Decompiled with CFR 0.152.
 */
package alluxio.master.transport;

import alluxio.grpc.MessagingRequestHeader;
import alluxio.grpc.MessagingResponseHeader;
import alluxio.grpc.TransportMessage;
import alluxio.master.transport.GrpcMessagingContext;
import alluxio.master.transport.Listener;
import alluxio.master.transport.Listeners;
import alluxio.resource.LockResource;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.protobuf.UnsafeByteOperations;
import io.atomix.catalyst.serializer.SerializationException;
import io.grpc.stub.StreamObserver;
import java.net.ConnectException;
import java.time.Duration;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.http.concurrent.Cancellable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class GrpcMessagingConnection
implements StreamObserver<TransportMessage> {
    private static final Logger LOG = LoggerFactory.getLogger(GrpcMessagingConnection.class);
    private static AtomicLong sConnectionIdCounter = new AtomicLong(0L);
    private final Listeners<Throwable> mExceptionListeners;
    private final Listeners<GrpcMessagingConnection> mCloseListeners;
    private boolean mClosed;
    private boolean mStreamCompleted;
    private Throwable mLastFailure;
    private final AtomicLong mRequestCounter;
    private final Map<Class, HandlerHolder> mHandlers;
    private final Map<Long, ContextualFuture> mResponseFutures;
    private final GrpcMessagingContext mContext;
    private final ConnectionOwner mConnectionOwner;
    private final String mConnectionId;
    private StreamObserver<TransportMessage> mTargetObserver;
    private final long mRequestTimeoutMs;
    private final Cancellable mTimeoutScheduler;
    private final ReadWriteLock mStateLock;
    private final ExecutorService mExecutor;

    public GrpcMessagingConnection(ConnectionOwner connectionOwner, String transportId, GrpcMessagingContext context, ExecutorService executor, long requestTimeoutMs) {
        this.mConnectionOwner = connectionOwner;
        this.mConnectionId = MoreObjects.toStringHelper((Object)this).add("ConnectionOwner", (Object)this.mConnectionOwner).add("ConnectionId", sConnectionIdCounter.incrementAndGet()).add("TransportId", (Object)transportId).toString();
        this.mContext = context;
        this.mExecutor = executor;
        this.mRequestTimeoutMs = requestTimeoutMs;
        this.mStateLock = new ReentrantReadWriteLock();
        this.mClosed = false;
        this.mStreamCompleted = false;
        this.mRequestCounter = new AtomicLong(0L);
        this.mTimeoutScheduler = context.schedule(Duration.ofMillis(this.mRequestTimeoutMs), Duration.ofMillis(this.mRequestTimeoutMs / 2L), this::timeoutPendingRequests);
        this.mExceptionListeners = new Listeners();
        this.mCloseListeners = new Listeners();
        this.mHandlers = new ConcurrentHashMap<Class, HandlerHolder>();
        this.mResponseFutures = new ConcurrentSkipListMap<Long, ContextualFuture>();
    }

    public void setTargetObserver(StreamObserver<TransportMessage> targetObserver) {
        this.mTargetObserver = targetObserver;
    }

    public CompletableFuture<Void> send(Object request) {
        return this.sendAndReceiveInternal(request, true);
    }

    public <T, U> CompletableFuture<U> sendAndReceive(T request) {
        return this.sendAndReceiveInternal(request, false);
    }

    private <T, U> CompletableFuture<U> sendAndReceiveInternal(T request, boolean fireAndForget) {
        try (LockResource lock = new LockResource(this.mStateLock.readLock());){
            Preconditions.checkNotNull(request, (Object)"request should not be null");
            ContextualFuture future = new ContextualFuture(System.currentTimeMillis(), GrpcMessagingContext.currentContextOrThrow());
            if (this.mClosed) {
                future.completeExceptionally(new IllegalStateException("GrpcMessagingConnection closed"));
                ContextualFuture contextualFuture = future;
                return contextualFuture;
            }
            long requestId = this.mRequestCounter.incrementAndGet();
            this.mResponseFutures.put(requestId, future);
            try {
                this.mTargetObserver.onNext((Object)TransportMessage.newBuilder().setRequestHeader(MessagingRequestHeader.newBuilder().setRequestId(requestId)).setMessage(UnsafeByteOperations.unsafeWrap((byte[])future.getContext().serializer().writeObject(request).array())).build());
            }
            catch (Exception e) {
                future.completeExceptionally(e);
                ContextualFuture contextualFuture = future;
                if (lock != null) {
                    if (var4_4 != null) {
                        try {
                            lock.close();
                        }
                        catch (Throwable throwable) {
                            var4_4.addSuppressed(throwable);
                        }
                    } else {
                        lock.close();
                    }
                }
                return contextualFuture;
            }
            if (fireAndForget) {
                future.complete(null);
            }
            LOG.debug("Submitted request({}) of type: {}. Connection: {} FireAndForget: {}", new Object[]{requestId, request.getClass().getName(), this.mConnectionId, fireAndForget});
            ContextualFuture contextualFuture = future;
            return contextualFuture;
        }
    }

    public <T, U> GrpcMessagingConnection handler(Class<T> type, Consumer<T> handler) {
        return this.handler(type, (T r) -> {
            handler.accept(r);
            return null;
        });
    }

    public <T, U> GrpcMessagingConnection handler(Class<T> type, Function<T, CompletableFuture<U>> handler) {
        Preconditions.checkNotNull(type, (Object)"type should not be null");
        try (LockResource lock = new LockResource(this.mStateLock.readLock());){
            if (this.mClosed) {
                throw new IllegalStateException("Connection closed");
            }
            this.mHandlers.put(type, new HandlerHolder(handler, GrpcMessagingContext.currentContextOrThrow()));
            GrpcMessagingConnection grpcMessagingConnection = null;
            return grpcMessagingConnection;
        }
    }

    private void handleRequestMessage(TransportMessage requestMessage) {
        long requestId = requestMessage.getRequestHeader().getRequestId();
        try {
            Object request = this.mContext.serializer().readObject(requestMessage.getMessage().newInput());
            LOG.debug("Handling request({}) of type: {}. GrpcMessagingConnection: {}", new Object[]{requestId, request.getClass().getName(), this.mConnectionId});
            HandlerHolder handler = this.mHandlers.get(request.getClass());
            if (handler != null) {
                handler.getContext().executor().execute(() -> this.handleRequest(requestId, request, handler));
            } else {
                this.sendResponse(requestId, this.mContext, (Object)new SerializationException("Unknown message type: " + request.getClass()));
            }
        }
        catch (SerializationException e) {
            this.sendResponse(requestId, this.mContext, (Object)e);
        }
    }

    private void handleRequest(long requestId, Object requestObject, HandlerHolder handler) {
        CompletableFuture responseFuture = (CompletableFuture)handler.getHandler().apply(requestObject);
        if (responseFuture != null) {
            responseFuture.whenComplete((response, error) -> {
                Runnable responseAction = () -> {
                    if (error == null) {
                        this.sendResponse(requestId, this.mContext, response);
                    } else {
                        this.sendResponse(requestId, this.mContext, error);
                    }
                };
                if (GrpcMessagingContext.currentContext() != null) {
                    responseAction.run();
                } else {
                    this.mContext.executor().execute(responseAction);
                }
            });
        }
    }

    private void sendResponse(long requestId, GrpcMessagingContext context, Object responseObject) {
        LOG.debug("Sending response of type: {} for request({}). GrpcMessagingConnection: {}", new Object[]{this.responseObjectType(responseObject), requestId, this.mConnectionOwner});
        TransportMessage.Builder messageBuilder = TransportMessage.newBuilder().setResponseHeader(MessagingResponseHeader.newBuilder().setRequestId(requestId).setIsThrowable(responseObject instanceof Throwable));
        if (responseObject != null) {
            messageBuilder.setMessage(UnsafeByteOperations.unsafeWrap((byte[])context.serializer().writeObject(responseObject).array()));
        }
        this.mTargetObserver.onNext((Object)messageBuilder.build());
    }

    protected void handleResponseMessage(TransportMessage response) {
        ContextualFuture future = this.mResponseFutures.remove(response.getResponseHeader().getRequestId());
        if (future == null) {
            LOG.debug("Received a response for nonexistent request({}).Connection is closed or the request has been timed out. Connection: {}", (Object)response.getResponseHeader().getRequestId(), (Object)this.mConnectionId);
            return;
        }
        try {
            if (response.getResponseHeader().getIsThrowable()) {
                Throwable error = (Throwable)this.mContext.serializer().readObject(response.getMessage().newInput());
                LOG.debug("Received an exception for request({}). Connection: {}", new Object[]{response.getResponseHeader().getRequestId(), this.mConnectionId, error});
                future.getContext().executor().execute(() -> future.completeExceptionally(error));
            } else {
                AtomicReference<Object> responseObjectRef = new AtomicReference<Object>(null);
                if (response.hasMessage()) {
                    responseObjectRef.set(this.mContext.serializer().readObject(response.getMessage().newInput()));
                }
                LOG.debug("Received response of type: {} for request({}). Connection: {}", new Object[]{this.responseObjectType(responseObjectRef.get()), response.getResponseHeader().getRequestId(), this.mConnectionId});
                future.getContext().executor().execute(() -> future.complete(responseObjectRef.get()));
            }
        }
        catch (SerializationException e) {
            future.getContext().executor().execute(() -> future.completeExceptionally(e));
        }
    }

    private String responseObjectType(Object responseObject) {
        return responseObject != null ? responseObject.getClass().getName() : "<NULL>";
    }

    public Listener<Throwable> onException(Consumer<Throwable> listener) {
        if (this.mLastFailure != null) {
            listener.accept(this.mLastFailure);
        }
        return this.mExceptionListeners.add((Consumer)Preconditions.checkNotNull(listener, (Object)"listener should not be null"));
    }

    public Listener<GrpcMessagingConnection> onClose(Consumer<GrpcMessagingConnection> listener) {
        if (this.mClosed) {
            listener.accept(this);
        }
        return this.mCloseListeners.add((Consumer)Preconditions.checkNotNull(listener, (Object)"listener should not be null"));
    }

    public CompletableFuture<Void> close() {
        if (this.mClosed) {
            return CompletableFuture.completedFuture(null);
        }
        return CompletableFuture.runAsync(() -> {
            LOG.debug("Closing connection: {}", (Object)this.mConnectionId);
            Throwable throwable = null;
            try (LockResource lock = new LockResource(this.mStateLock.writeLock());){
                this.mClosed = true;
            }
            catch (Throwable throwable2) {
                Throwable throwable3 = throwable2;
                throw throwable2;
            }
            this.mTimeoutScheduler.cancel();
            if (!this.mStreamCompleted) {
                try {
                    this.mTargetObserver.onCompleted();
                }
                catch (Exception e) {
                    LOG.debug("Completing underlying gRPC stream failed.", (Throwable)e);
                }
            }
            this.failPendingRequests(new ConnectException("Connection closed."));
            for (Listener<GrpcMessagingConnection> listener : this.mCloseListeners) {
                listener.accept(this);
            }
        }, this.mExecutor);
    }

    public void onNext(TransportMessage message) {
        LOG.debug("Received a new message. GrpcMessagingConnection: {}, RequestHeader: {}, ResponseHeader: {}", new Object[]{this.mConnectionId, message.getRequestHeader(), message.getResponseHeader()});
        if (message.hasRequestHeader()) {
            this.handleRequestMessage(message);
        } else if (message.hasResponseHeader()) {
            this.handleResponseMessage(message);
        } else {
            throw new RuntimeException("Message should contain a request/response header.");
        }
    }

    public void onError(Throwable t) {
        LOG.debug("Connection failed: {}", (Object)this.mConnectionId, (Object)t);
        Throwable throwable = null;
        try (LockResource lock = new LockResource(this.mStateLock.writeLock());){
            this.mClosed = true;
        }
        catch (Throwable throwable2) {
            Throwable throwable3 = throwable2;
            throw throwable2;
        }
        this.mLastFailure = t;
        this.failPendingRequests(t);
        for (Listener<Throwable> listener : this.mExceptionListeners) {
            listener.accept(t);
        }
        for (Listener<Object> listener : this.mCloseListeners) {
            listener.accept(this);
        }
    }

    public void onCompleted() {
        LOG.debug("Connection completed: {}", (Object)this.mConnectionId);
        this.mStreamCompleted = true;
        if (this.mConnectionOwner == ConnectionOwner.SERVER) {
            this.mTargetObserver.onCompleted();
        }
        this.close();
    }

    public String toString() {
        return this.mConnectionId;
    }

    private void timeoutPendingRequests() {
        Map.Entry<Long, ContextualFuture> requestEntry;
        ContextualFuture future;
        long currentTimeMillis = System.currentTimeMillis();
        Iterator<Map.Entry<Long, ContextualFuture>> responseIterator = this.mResponseFutures.entrySet().iterator();
        while (responseIterator.hasNext() && (future = (requestEntry = responseIterator.next()).getValue()).getCreationTime() + this.mRequestTimeoutMs < currentTimeMillis) {
            LOG.debug("Timing out request({}). GrpcMessagingConnection: {}", (Object)requestEntry.getKey(), (Object)this.mConnectionId);
            responseIterator.remove();
            future.getContext().executor().execute(() -> future.completeExceptionally(new TimeoutException("Request timed out.")));
        }
    }

    private void failPendingRequests(Throwable error) {
        for (Map.Entry<Long, ContextualFuture> responseEntry : this.mResponseFutures.entrySet()) {
            LOG.debug("Closing request({}) with error: {}. GrpcMessagingConnection: {}", new Object[]{responseEntry.getKey(), error.getClass().getName(), this.mConnectionId});
            ContextualFuture responseFuture = responseEntry.getValue();
            responseFuture.getContext().executor().execute(() -> responseFuture.completeExceptionally(error));
        }
    }

    protected static class ContextualFuture<T>
    extends CompletableFuture<T> {
        private final long mCreationTime;
        private final GrpcMessagingContext mContext;

        private ContextualFuture(long creationTime, GrpcMessagingContext context) {
            this.mCreationTime = creationTime;
            this.mContext = context;
        }

        private GrpcMessagingContext getContext() {
            return this.mContext;
        }

        private long getCreationTime() {
            return this.mCreationTime;
        }
    }

    protected static class HandlerHolder {
        private final Function<Object, CompletableFuture<Object>> mHandler;
        private final GrpcMessagingContext mContext;

        private HandlerHolder(Function handler, GrpcMessagingContext context) {
            this.mHandler = handler;
            this.mContext = context;
        }

        private GrpcMessagingContext getContext() {
            return this.mContext;
        }

        private Function<Object, CompletableFuture<Object>> getHandler() {
            return this.mHandler;
        }
    }

    protected static enum ConnectionOwner {
        CLIENT,
        SERVER;

    }
}

