/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc.akka;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Status;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.Patterns;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rpc.MainThreadValidatorUtil;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.akka.ControlMessages;
import org.apache.flink.runtime.rpc.akka.exceptions.AkkaHandshakeException;
import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException;
import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcInvalidStateException;
import org.apache.flink.runtime.rpc.akka.exceptions.AkkaUnknownMessageException;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
import org.apache.flink.runtime.rpc.messages.CallAsync;
import org.apache.flink.runtime.rpc.messages.HandshakeSuccessMessage;
import org.apache.flink.runtime.rpc.messages.RemoteHandshakeMessage;
import org.apache.flink.runtime.rpc.messages.RpcInvocation;
import org.apache.flink.runtime.rpc.messages.RunAsync;
import org.apache.flink.types.Either;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.impl.Promise;

class AkkaRpcActor<T extends RpcEndpoint>
extends AbstractActor {
    protected final Logger log = LoggerFactory.getLogger(((Object)((Object)this)).getClass());
    protected final T rpcEndpoint;
    private final MainThreadValidatorUtil mainThreadValidator;
    private final CompletableFuture<Boolean> terminationFuture;
    private final int version;
    private final long maximumFramesize;
    private final AtomicBoolean rpcEndpointStopped;
    private volatile RpcEndpointTerminationResult rpcEndpointTerminationResult;
    @Nonnull
    private State state;

    AkkaRpcActor(T rpcEndpoint, CompletableFuture<Boolean> terminationFuture, int version, long maximumFramesize) {
        Preconditions.checkArgument((maximumFramesize > 0L ? 1 : 0) != 0, (Object)"Maximum framesize must be positive.");
        this.rpcEndpoint = (RpcEndpoint)Preconditions.checkNotNull(rpcEndpoint, (String)"rpc endpoint");
        this.mainThreadValidator = new MainThreadValidatorUtil((RpcEndpoint)rpcEndpoint);
        this.terminationFuture = (CompletableFuture)Preconditions.checkNotNull(terminationFuture);
        this.version = version;
        this.maximumFramesize = maximumFramesize;
        this.rpcEndpointStopped = new AtomicBoolean(false);
        this.rpcEndpointTerminationResult = RpcEndpointTerminationResult.failure(new AkkaRpcException(String.format("RpcEndpoint %s has not been properly stopped.", ((RpcEndpoint)rpcEndpoint).getEndpointId())));
        this.state = StoppedState.STOPPED;
    }

    public void postStop() throws Exception {
        super.postStop();
        if (this.rpcEndpointTerminationResult.isSuccess()) {
            this.terminationFuture.complete(null);
        } else {
            this.terminationFuture.completeExceptionally(this.rpcEndpointTerminationResult.getFailureCause());
        }
        this.state = this.state.finishTermination();
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(RemoteHandshakeMessage.class, this::handleHandshakeMessage).match(ControlMessages.class, this::handleControlMessage).matchAny(this::handleMessage).build();
    }

    private void handleMessage(Object message) {
        if (this.state.isRunning()) {
            this.mainThreadValidator.enterMainThread();
            try {
                this.handleRpcMessage(message);
            }
            finally {
                this.mainThreadValidator.exitMainThread();
            }
        } else {
            this.log.info("The rpc endpoint {} has not been started yet. Discarding message {} until processing is started.", (Object)this.rpcEndpoint.getClass().getName(), (Object)message.getClass().getName());
            this.sendErrorIfSender(new AkkaRpcException(String.format("Discard message, because the rpc endpoint %s has not been started yet.", ((RpcEndpoint)this.rpcEndpoint).getAddress())));
        }
    }

    private void handleControlMessage(ControlMessages controlMessage) {
        try {
            switch (controlMessage) {
                case START: {
                    this.state = this.state.start(this);
                    break;
                }
                case STOP: {
                    this.state = this.state.stop();
                    break;
                }
                case TERMINATE: {
                    this.state = this.state.terminate(this);
                    break;
                }
                default: {
                    this.handleUnknownControlMessage(controlMessage);
                    break;
                }
            }
        }
        catch (Exception e) {
            this.rpcEndpointTerminationResult = RpcEndpointTerminationResult.failure(e);
            throw e;
        }
    }

    private void handleUnknownControlMessage(ControlMessages controlMessage) {
        String message = String.format("Received unknown control message %s. Dropping this message!", new Object[]{controlMessage});
        this.log.warn(message);
        this.sendErrorIfSender(new AkkaUnknownMessageException(message));
    }

    protected void handleRpcMessage(Object message) {
        if (message instanceof RunAsync) {
            this.handleRunAsync((RunAsync)message);
        } else if (message instanceof CallAsync) {
            this.handleCallAsync((CallAsync)message);
        } else if (message instanceof RpcInvocation) {
            this.handleRpcInvocation((RpcInvocation)message);
        } else {
            this.log.warn("Received message of unknown type {} with value {}. Dropping this message!", (Object)message.getClass().getName(), message);
            this.sendErrorIfSender(new AkkaUnknownMessageException("Received unknown message " + message + " of type " + message.getClass().getSimpleName() + '.'));
        }
    }

    private void handleHandshakeMessage(RemoteHandshakeMessage handshakeMessage) {
        if (!this.isCompatibleVersion(handshakeMessage.getVersion())) {
            this.sendErrorIfSender(new AkkaHandshakeException(String.format("Version mismatch between source (%s) and target (%s) rpc component. Please verify that all components have the same version.", handshakeMessage.getVersion(), this.getVersion())));
        } else if (!this.isGatewaySupported(handshakeMessage.getRpcGateway())) {
            this.sendErrorIfSender(new AkkaHandshakeException(String.format("The rpc endpoint does not support the gateway %s.", handshakeMessage.getRpcGateway().getSimpleName())));
        } else {
            this.getSender().tell((Object)new Status.Success((Object)HandshakeSuccessMessage.INSTANCE), this.getSelf());
        }
    }

    private boolean isGatewaySupported(Class<?> rpcGateway) {
        return rpcGateway.isAssignableFrom(this.rpcEndpoint.getClass());
    }

    private boolean isCompatibleVersion(int sourceVersion) {
        return sourceVersion == this.getVersion();
    }

    private int getVersion() {
        return this.version;
    }

    private void handleRpcInvocation(RpcInvocation rpcInvocation) {
        block12: {
            RpcConnectionException rpcException;
            Method rpcMethod = null;
            try {
                String methodName = rpcInvocation.getMethodName();
                Class<?>[] parameterTypes = rpcInvocation.getParameterTypes();
                rpcMethod = this.lookupRpcMethod(methodName, parameterTypes);
            }
            catch (ClassNotFoundException e) {
                this.log.error("Could not load method arguments.", (Throwable)e);
                rpcException = new RpcConnectionException("Could not load method arguments.", e);
                this.getSender().tell((Object)new Status.Failure((Throwable)rpcException), this.getSelf());
            }
            catch (IOException e) {
                this.log.error("Could not deserialize rpc invocation message.", (Throwable)e);
                rpcException = new RpcConnectionException("Could not deserialize rpc invocation message.", e);
                this.getSender().tell((Object)new Status.Failure((Throwable)rpcException), this.getSelf());
            }
            catch (NoSuchMethodException e) {
                this.log.error("Could not find rpc method for rpc invocation.", (Throwable)e);
                rpcException = new RpcConnectionException("Could not find rpc method for rpc invocation.", e);
                this.getSender().tell((Object)new Status.Failure((Throwable)rpcException), this.getSelf());
            }
            if (rpcMethod != null) {
                try {
                    Object result;
                    rpcMethod.setAccessible(true);
                    if (rpcMethod.getReturnType().equals(Void.TYPE)) {
                        rpcMethod.invoke(this.rpcEndpoint, rpcInvocation.getArgs());
                        break block12;
                    }
                    try {
                        result = rpcMethod.invoke(this.rpcEndpoint, rpcInvocation.getArgs());
                    }
                    catch (InvocationTargetException e) {
                        this.log.debug("Reporting back error thrown in remote procedure {}", (Object)rpcMethod, (Object)e);
                        this.getSender().tell((Object)new Status.Failure(e.getTargetException()), this.getSelf());
                        return;
                    }
                    String methodName = rpcMethod.getName();
                    if (result instanceof CompletableFuture) {
                        CompletableFuture responseFuture = (CompletableFuture)result;
                        this.sendAsyncResponse(responseFuture, methodName);
                    } else {
                        this.sendSyncResponse(result, methodName);
                    }
                }
                catch (Throwable e) {
                    this.log.error("Error while executing remote procedure call {}.", (Object)rpcMethod, (Object)e);
                    this.getSender().tell((Object)new Status.Failure(e), this.getSelf());
                }
            }
        }
    }

    private void sendSyncResponse(Object response, String methodName) {
        if (this.isRemoteSender(this.getSender())) {
            Either<SerializedValue<?>, AkkaRpcException> serializedResult = this.serializeRemoteResultAndVerifySize(response, methodName);
            if (serializedResult.isLeft()) {
                this.getSender().tell((Object)new Status.Success(serializedResult.left()), this.getSelf());
            } else {
                this.getSender().tell((Object)new Status.Failure((Throwable)serializedResult.right()), this.getSelf());
            }
        } else {
            this.getSender().tell((Object)new Status.Success(response), this.getSelf());
        }
    }

    private void sendAsyncResponse(CompletableFuture<?> asyncResponse, String methodName) {
        ActorRef sender = this.getSender();
        Promise.DefaultPromise promise = new Promise.DefaultPromise();
        asyncResponse.whenComplete((value, throwable) -> {
            if (throwable != null) {
                promise.failure(throwable);
            } else if (this.isRemoteSender(sender)) {
                Either<SerializedValue<?>, AkkaRpcException> serializedResult = this.serializeRemoteResultAndVerifySize(value, methodName);
                if (serializedResult.isLeft()) {
                    promise.success(serializedResult.left());
                } else {
                    promise.failure((Throwable)serializedResult.right());
                }
            } else {
                promise.success(value);
            }
        });
        Patterns.pipe((Future)promise.future(), (ExecutionContext)this.getContext().dispatcher()).to(sender);
    }

    private boolean isRemoteSender(ActorRef sender) {
        return !sender.path().address().hasLocalScope();
    }

    private Either<SerializedValue<?>, AkkaRpcException> serializeRemoteResultAndVerifySize(Object result, String methodName) {
        try {
            SerializedValue serializedResult = new SerializedValue(result);
            long resultSize = serializedResult.getByteArray().length;
            if (resultSize > this.maximumFramesize) {
                return Either.Right((Object)new AkkaRpcException("The method " + methodName + "'s result size " + resultSize + " exceeds the maximum size " + this.maximumFramesize + " ."));
            }
            return Either.Left((Object)serializedResult);
        }
        catch (IOException e) {
            return Either.Right((Object)new AkkaRpcException("Failed to serialize the result for RPC call : " + methodName + '.', e));
        }
    }

    private void handleCallAsync(CallAsync callAsync) {
        try {
            Object result = callAsync.getCallable().call();
            this.getSender().tell((Object)new Status.Success(result), this.getSelf());
        }
        catch (Throwable e) {
            this.getSender().tell((Object)new Status.Failure(e), this.getSelf());
        }
    }

    private void handleRunAsync(RunAsync runAsync) {
        long delayNanos;
        long timeToRun = runAsync.getTimeNanos();
        if (timeToRun == 0L || (delayNanos = timeToRun - System.nanoTime()) <= 0L) {
            try {
                runAsync.getRunnable().run();
            }
            catch (Throwable t) {
                this.log.error("Caught exception while executing runnable in main thread.", t);
                ExceptionUtils.rethrowIfFatalErrorOrOOM((Throwable)t);
            }
        } else {
            FiniteDuration delay = new FiniteDuration(delayNanos, TimeUnit.NANOSECONDS);
            RunAsync message = new RunAsync(runAsync.getRunnable(), timeToRun);
            Object envelopedSelfMessage = this.envelopeSelfMessage(message);
            this.getContext().system().scheduler().scheduleOnce(delay, this.getSelf(), envelopedSelfMessage, (ExecutionContext)this.getContext().dispatcher(), ActorRef.noSender());
        }
    }

    private Method lookupRpcMethod(String methodName, Class<?>[] parameterTypes) throws NoSuchMethodException {
        return this.rpcEndpoint.getClass().getMethod(methodName, parameterTypes);
    }

    protected void sendErrorIfSender(Throwable throwable) {
        if (!this.getSender().equals((Object)ActorRef.noSender())) {
            this.getSender().tell((Object)new Status.Failure(throwable), this.getSelf());
        }
    }

    protected Object envelopeSelfMessage(Object message) {
        return message;
    }

    private void stop(RpcEndpointTerminationResult rpcEndpointTerminationResult) {
        if (this.rpcEndpointStopped.compareAndSet(false, true)) {
            this.rpcEndpointTerminationResult = rpcEndpointTerminationResult;
            this.getContext().stop(this.getSelf());
        }
    }

    private static final class RpcEndpointTerminationResult {
        private static final RpcEndpointTerminationResult SUCCESS = new RpcEndpointTerminationResult(null);
        @Nullable
        private final Throwable failureCause;

        private RpcEndpointTerminationResult(@Nullable Throwable failureCause) {
            this.failureCause = failureCause;
        }

        public boolean isSuccess() {
            return this.failureCause == null;
        }

        public Throwable getFailureCause() {
            Preconditions.checkState((this.failureCause != null ? 1 : 0) != 0);
            return this.failureCause;
        }

        private static RpcEndpointTerminationResult success() {
            return SUCCESS;
        }

        private static RpcEndpointTerminationResult failure(Throwable failureCause) {
            return new RpcEndpointTerminationResult(failureCause);
        }

        private static RpcEndpointTerminationResult of(@Nullable Throwable failureCause) {
            if (failureCause == null) {
                return RpcEndpointTerminationResult.success();
            }
            return RpcEndpointTerminationResult.failure(failureCause);
        }
    }

    static enum TerminatedState implements State
    {
        TERMINATED;

    }

    static enum TerminatingState implements State
    {
        TERMINATING;


        @Override
        public State terminate(AkkaRpcActor<?> akkaRpcActor) {
            return TERMINATING;
        }

        @Override
        public boolean isRunning() {
            return true;
        }
    }

    static enum StoppedState implements State
    {
        STOPPED;


        @Override
        public State start(AkkaRpcActor<?> akkaRpcActor) {
            ((AkkaRpcActor)akkaRpcActor).mainThreadValidator.enterMainThread();
            try {
                ((RpcEndpoint)akkaRpcActor.rpcEndpoint).internalCallOnStart();
            }
            catch (Throwable throwable) {
                ((AkkaRpcActor)akkaRpcActor).stop(RpcEndpointTerminationResult.failure(new AkkaRpcException(String.format("Could not start RpcEndpoint %s.", ((RpcEndpoint)akkaRpcActor.rpcEndpoint).getEndpointId()), throwable)));
            }
            finally {
                ((AkkaRpcActor)akkaRpcActor).mainThreadValidator.exitMainThread();
            }
            return StartedState.STARTED;
        }

        @Override
        public State stop() {
            return STOPPED;
        }

        @Override
        public State terminate(AkkaRpcActor<?> akkaRpcActor) {
            ((AkkaRpcActor)akkaRpcActor).stop(RpcEndpointTerminationResult.success());
            return TerminatingState.TERMINATING;
        }
    }

    static enum StartedState implements State
    {
        STARTED;


        @Override
        public State start(AkkaRpcActor<?> akkaRpcActor) {
            return STARTED;
        }

        @Override
        public State stop() {
            return StoppedState.STOPPED;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public State terminate(AkkaRpcActor<?> akkaRpcActor) {
            CompletableFuture<Void> terminationFuture;
            ((AkkaRpcActor)akkaRpcActor).mainThreadValidator.enterMainThread();
            try {
                terminationFuture = ((RpcEndpoint)akkaRpcActor.rpcEndpoint).internalCallOnStop();
            }
            catch (Throwable t) {
                terminationFuture = FutureUtils.completedExceptionally(new AkkaRpcException(String.format("Failure while stopping RpcEndpoint %s.", ((RpcEndpoint)akkaRpcActor.rpcEndpoint).getEndpointId()), t));
            }
            finally {
                ((AkkaRpcActor)akkaRpcActor).mainThreadValidator.exitMainThread();
            }
            terminationFuture.whenComplete((ignored, throwable) -> ((AkkaRpcActor)akkaRpcActor).stop(RpcEndpointTerminationResult.of(throwable)));
            return TerminatingState.TERMINATING;
        }

        @Override
        public boolean isRunning() {
            return true;
        }
    }

    static interface State {
        default public State start(AkkaRpcActor<?> akkaRpcActor) {
            throw new AkkaRpcInvalidStateException(this.invalidStateTransitionMessage(StartedState.STARTED));
        }

        default public State stop() {
            throw new AkkaRpcInvalidStateException(this.invalidStateTransitionMessage(StoppedState.STOPPED));
        }

        default public State terminate(AkkaRpcActor<?> akkaRpcActor) {
            throw new AkkaRpcInvalidStateException(this.invalidStateTransitionMessage(TerminatingState.TERMINATING));
        }

        default public State finishTermination() {
            return TerminatedState.TERMINATED;
        }

        default public boolean isRunning() {
            return false;
        }

        default public String invalidStateTransitionMessage(State targetState) {
            return String.format("AkkaRpcActor is currently in state %s and cannot go into state %s.", this, targetState);
        }
    }
}

