/*
 * Decompiled with CFR 0.152.
 */
package io.reactivex.netty.protocol.http.server;

import io.netty.handler.codec.http.HttpVersion;
import io.reactivex.netty.channel.ConnectionHandler;
import io.reactivex.netty.channel.ObservableConnection;
import io.reactivex.netty.metrics.Clock;
import io.reactivex.netty.metrics.MetricEventsSubject;
import io.reactivex.netty.protocol.http.server.DefaultErrorResponseGenerator;
import io.reactivex.netty.protocol.http.server.ErrorResponseGenerator;
import io.reactivex.netty.protocol.http.server.HttpServerMetricsEvent;
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import io.reactivex.netty.protocol.http.server.RequestHandler;
import rx.Observable;
import rx.Subscriber;

class HttpConnectionHandler<I, O>
implements ConnectionHandler<HttpServerRequest<I>, HttpServerResponse<O>> {
    private ErrorResponseGenerator<O> responseGenerator = new DefaultErrorResponseGenerator();
    private final RequestHandler<I, O> requestHandler;
    private final boolean send10ResponseFor10Request;
    private MetricEventsSubject eventsSubject;

    public HttpConnectionHandler(RequestHandler<I, O> requestHandler) {
        this(requestHandler, false);
    }

    public HttpConnectionHandler(RequestHandler<I, O> requestHandler, boolean send10ResponseFor10Request) {
        this.requestHandler = requestHandler;
        this.send10ResponseFor10Request = send10ResponseFor10Request;
    }

    void setResponseGenerator(ErrorResponseGenerator<O> responseGenerator) {
        this.responseGenerator = responseGenerator;
    }

    void useMetricEventsSubject(MetricEventsSubject<?> eventsSubject) {
        this.eventsSubject = eventsSubject;
    }

    @Override
    public Observable<Void> handle(final ObservableConnection<HttpServerRequest<I>, HttpServerResponse<O>> newConnection) {
        return newConnection.getInput().lift(new Observable.Operator<Void, HttpServerRequest<I>>(){

            public Subscriber<? super HttpServerRequest<I>> call(final Subscriber<? super Void> child) {
                return new Subscriber<HttpServerRequest<I>>(){

                    public void onCompleted() {
                        child.onCompleted();
                    }

                    public void onError(Throwable e) {
                        child.onError(e);
                    }

                    public void onNext(HttpServerRequest<I> newRequest) {
                        Observable requestHandlingResult;
                        final long startTimeMillis = Clock.newStartTimeMillis();
                        HttpConnectionHandler.this.eventsSubject.onEvent(HttpServerMetricsEvent.NEW_REQUEST_RECEIVED);
                        final HttpServerResponse response = new HttpServerResponse(newConnection.getChannel(), HttpConnectionHandler.this.send10ResponseFor10Request ? newRequest.getHttpVersion() : HttpVersion.HTTP_1_1, HttpConnectionHandler.this.eventsSubject);
                        if (newRequest.getHeaders().isKeepAlive()) {
                            if (!newRequest.getHttpVersion().isKeepAliveDefault()) {
                                response.getHeaders().set("Connection", (Object)"keep-alive");
                            }
                        } else {
                            response.getHeaders().set("Connection", (Object)"close");
                        }
                        try {
                            HttpConnectionHandler.this.eventsSubject.onEvent(HttpServerMetricsEvent.REQUEST_HANDLING_START, Clock.onEndMillis(startTimeMillis));
                            requestHandlingResult = HttpConnectionHandler.this.requestHandler.handle(newRequest, response);
                            if (null == requestHandlingResult) {
                                requestHandlingResult = Observable.empty();
                            }
                        }
                        catch (Throwable throwable) {
                            requestHandlingResult = Observable.error((Throwable)throwable);
                        }
                        requestHandlingResult.subscribe((Subscriber)new Subscriber<Void>(){

                            public void onCompleted() {
                                HttpConnectionHandler.this.eventsSubject.onEvent(HttpServerMetricsEvent.REQUEST_HANDLING_SUCCESS, Clock.onEndMillis(startTimeMillis));
                                response.close(false);
                            }

                            public void onError(Throwable throwable) {
                                HttpConnectionHandler.this.eventsSubject.onEvent(HttpServerMetricsEvent.REQUEST_HANDLING_FAILED, Clock.onEndMillis(startTimeMillis), throwable);
                                if (!response.isHeaderWritten()) {
                                    HttpConnectionHandler.this.responseGenerator.updateResponse(response, throwable);
                                }
                                response.close(true);
                            }

                            public void onNext(Void aVoid) {
                            }
                        });
                    }
                };
            }
        });
    }
}

