/*
 * Decompiled with CFR 0.152.
 */
package com.uber.tchannel.handlers;

import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.uber.tchannel.api.SubChannel;
import com.uber.tchannel.api.TChannel;
import com.uber.tchannel.api.handlers.AsyncRequestHandler;
import com.uber.tchannel.api.handlers.RequestHandler;
import com.uber.tchannel.errors.ErrorType;
import com.uber.tchannel.frames.ErrorFrame;
import com.uber.tchannel.messages.Request;
import com.uber.tchannel.messages.Response;
import com.uber.tchannel.tracing.Tracing;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.opentracing.Span;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RequestRouter
extends SimpleChannelInboundHandler<Request> {
    private static final Logger logger = LoggerFactory.getLogger(RequestRouter.class);
    @NotNull
    private final TChannel topChannel;
    @NotNull
    private final ListeningExecutorService listeningExecutorService;
    @NotNull
    private final AtomicBoolean busy = new AtomicBoolean(false);
    @NotNull
    private final ConcurrentLinkedQueue<Response> responseQueue = new ConcurrentLinkedQueue();

    public RequestRouter(@NotNull TChannel topChannel, @NotNull ExecutorService executorService) {
        this.topChannel = topChannel;
        this.listeningExecutorService = MoreExecutors.listeningDecorator((ExecutorService)executorService);
    }

    private RequestHandler getRequestHandler(String service, String endpoint) {
        SubChannel subChannel = this.topChannel.getSubChannel(service);
        RequestHandler handler = null;
        if (subChannel != null) {
            handler = subChannel.getRequestHandler(endpoint);
        }
        return handler;
    }

    protected void channelRead0(final ChannelHandlerContext ctx, final Request request) {
        ListenableFuture responseFuture;
        if (!ctx.channel().isActive()) {
            request.release();
            logger.warn("drop request when channel is inActive");
            return;
        }
        if (request.getArgScheme() == null) {
            ErrorFrame.sendError(ErrorType.BadRequest, "Expected incoming call to have \"as\" header set", request, ctx);
            return;
        }
        String service = request.getService();
        if (service == null || service.isEmpty()) {
            ErrorFrame.sendError(ErrorType.BadRequest, "Expected incoming call to have serviceName", request, ctx);
            return;
        }
        String endpoint = request.getEndpoint();
        if (endpoint == null || endpoint.isEmpty()) {
            ErrorFrame.sendError(ErrorType.BadRequest, "Expected incoming call to have endpoint", request, ctx);
            return;
        }
        RequestHandler handler = this.getRequestHandler(service, endpoint);
        if (handler == null) {
            handler = this.topChannel.getDefaultUserHandler();
        }
        if (handler == null) {
            ErrorFrame.sendError(ErrorType.BadRequest, "No handler function for service:endpoint=" + service + ':' + endpoint, request, ctx);
            return;
        }
        try {
            responseFuture = handler instanceof AsyncRequestHandler ? this.sendRequestToAsyncHandler((AsyncRequestHandler)handler, request) : this.listeningExecutorService.submit((Callable)new CallableHandler(handler, this.topChannel, request));
        }
        catch (Throwable re) {
            request.release();
            responseFuture = Futures.immediateFailedFuture((Throwable)re);
        }
        Futures.addCallback((ListenableFuture)responseFuture, (FutureCallback)new FutureCallback<Response>(){

            public void onSuccess(Response response) {
                if (ctx.channel().isActive()) {
                    RequestRouter.this.responseQueue.offer(response);
                    RequestRouter.this.sendResponse(ctx);
                } else {
                    response.release();
                }
            }

            public void onFailure(@NotNull Throwable throwable) {
                logger.error("Failed to handle the request due to exception.", throwable);
                ErrorFrame.sendError(ErrorType.UnexpectedError, "Failed to handle the request: " + throwable.getMessage(), request, ctx);
            }
        }, (Executor)this.listeningExecutorService);
    }

    public void channelWritabilityChanged(ChannelHandlerContext ctx) {
        this.sendResponse(ctx);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void sendResponse(ChannelHandlerContext ctx) {
        if (!this.busy.compareAndSet(false, true)) {
            return;
        }
        Channel channel = ctx.channel();
        try {
            Response res;
            boolean flush = false;
            while (channel.isWritable() && (res = this.responseQueue.poll()) != null) {
                channel.write((Object)res);
                flush = true;
            }
            if (flush) {
                channel.flush();
            }
        }
        finally {
            this.busy.set(false);
        }
        if (channel.isWritable() && !this.responseQueue.isEmpty()) {
            this.sendResponse(ctx);
        }
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        while (!this.responseQueue.isEmpty()) {
            this.responseQueue.poll().release();
        }
    }

    private ListenableFuture<? extends Response> sendRequestToAsyncHandler(AsyncRequestHandler asyncHandler, final Request request) {
        final Span span = this.topChannel.getTracer() == null ? null : Tracing.startInboundSpan(request, this.topChannel.getTracer(), this.topChannel.getTracingContext());
        ListenableFuture<? extends Response> responseFuture = asyncHandler.handleAsync(request);
        Futures.addCallback(responseFuture, (FutureCallback)new FutureCallback<Response>(){

            public void onSuccess(Response response) {
                this.closeRequestAndSpan();
            }

            public void onFailure(@NotNull Throwable e) {
                if (span != null) {
                    span.log((Map)ImmutableMap.of((Object)"exception", (Object)e));
                }
                this.closeRequestAndSpan();
            }

            private void closeRequestAndSpan() {
                request.release();
                if (span != null) {
                    span.finish();
                }
            }
        }, (Executor)this.listeningExecutorService);
        if (span != null) {
            this.topChannel.getTracingContext().popSpan();
        }
        return responseFuture;
    }

    private static class CallableHandler
    implements Callable<Response> {
        private final Request request;
        private final TChannel topChannel;
        private final RequestHandler handler;

        CallableHandler(RequestHandler handler, TChannel topChannel, Request request) {
            this.handler = handler;
            this.topChannel = topChannel;
            this.request = request;
        }

        @Override
        public Response call() {
            if (this.topChannel.getTracer() == null) {
                return this.callWithoutTracing();
            }
            Span span = Tracing.startInboundSpan(this.request, this.topChannel.getTracer(), this.topChannel.getTracingContext());
            try {
                Response response = this.callWithoutTracing();
                return response;
            }
            catch (Throwable t) {
                span.log((Map)ImmutableMap.of((Object)"exception", (Object)t));
                throw t;
            }
            finally {
                span.finish();
                this.topChannel.getTracingContext().clear();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private Response callWithoutTracing() {
            try {
                Response response = this.handler.handle(this.request);
                return response;
            }
            finally {
                this.request.release();
            }
        }
    }
}

