package cf.dropsonde.firehose;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBufInputStream;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrameAggregator;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.CharsetUtil;
import java.io.Closeable;
import java.net.URI;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import javax.net.ssl.SSLException;
import javax.net.ssl.TrustManagerFactory;
import org.cloudfoundry.dropsonde.events.Envelope;
import rx.Observable;
import rx.Subscriber;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:cf/dropsonde/firehose/NettyFirehoseOnSubscribe.class */
public class NettyFirehoseOnSubscribe implements Observable.OnSubscribe<Envelope>, Closeable {
    public static final String HANDLER_NAME = "handler";
    private Channel channel;
    private final EventLoopGroup eventLoopGroup;
    private final Bootstrap bootstrap;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:cf/dropsonde/firehose/NettyFirehoseOnSubscribe$WebSocketClientHandler.class */
    public static class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {
        private final WebSocketClientHandshaker handshaker;
        private Subscriber<? super Envelope> subscriber;

        public WebSocketClientHandler(WebSocketClientHandshaker webSocketClientHandshaker) {
            this.handshaker = webSocketClientHandshaker;
        }

        public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
            this.handshaker.handshake(channelHandlerContext.channel());
        }

        public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
            this.subscriber.onCompleted();
        }

        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
            channelHandlerContext.close();
            this.subscriber.onError(th);
        }

        protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
            Channel channel = channelHandlerContext.channel();
            if (!this.handshaker.isHandshakeComplete()) {
                this.handshaker.finishHandshake(channel, (FullHttpResponse) obj);
                channel.pipeline().addBefore(NettyFirehoseOnSubscribe.HANDLER_NAME, "websocket-frame-aggregator", new WebSocketFrameAggregator(65536));
                this.subscriber.onStart();
            } else {
                if (obj instanceof FullHttpResponse) {
                    FullHttpResponse fullHttpResponse = (FullHttpResponse) obj;
                    throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + fullHttpResponse.getStatus() + ", content=" + fullHttpResponse.content().toString(CharsetUtil.UTF_8) + ')');
                }
                PingWebSocketFrame pingWebSocketFrame = (WebSocketFrame) obj;
                if (pingWebSocketFrame instanceof PingWebSocketFrame) {
                    channelHandlerContext.writeAndFlush(new PongWebSocketFrame(pingWebSocketFrame.retain().content()));
                } else if (pingWebSocketFrame instanceof BinaryWebSocketFrame) {
                    this.subscriber.onNext((Envelope) Envelope.ADAPTER.decode(new ByteBufInputStream(((BinaryWebSocketFrame) obj).content())));
                }
            }
        }

        public void setSubscriber(Subscriber<? super Envelope> subscriber) {
            this.subscriber = subscriber;
        }
    }

    public NettyFirehoseOnSubscribe(URI uri, final String str, String str2, boolean z, EventLoopGroup eventLoopGroup, Class<? extends SocketChannel> cls) {
        SslContext sslContext;
        try {
            final String host = uri.getHost() == null ? "127.0.0.1" : uri.getHost();
            String scheme = uri.getScheme() == null ? "ws" : uri.getScheme();
            final int port = getPort(scheme, uri.getPort());
            final URI resolve = uri.resolve("/firehose/" + str2);
            if ("wss".equalsIgnoreCase(scheme)) {
                SslContextBuilder forClient = SslContextBuilder.forClient();
                if (z) {
                    forClient.trustManager(InsecureTrustManagerFactory.INSTANCE);
                } else {
                    TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
                    trustManagerFactory.init((KeyStore) null);
                    forClient.trustManager(trustManagerFactory);
                }
                sslContext = forClient.build();
            } else {
                sslContext = null;
            }
            this.bootstrap = new Bootstrap();
            if (eventLoopGroup == null) {
                this.eventLoopGroup = new NioEventLoopGroup();
                this.bootstrap.group(this.eventLoopGroup);
            } else {
                this.eventLoopGroup = null;
                this.bootstrap.group(eventLoopGroup);
            }
            final SslContext sslContext2 = sslContext;
            this.bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 15000).channel(cls == null ? NioSocketChannel.class : cls).remoteAddress(host, port).handler(new ChannelInitializer<SocketChannel>() { // from class: cf.dropsonde.firehose.NettyFirehoseOnSubscribe.1
                /* JADX INFO: Access modifiers changed from: protected */
                public void initChannel(SocketChannel socketChannel) throws Exception {
                    DefaultHttpHeaders defaultHttpHeaders = new DefaultHttpHeaders();
                    defaultHttpHeaders.add("Authorization", str);
                    WebSocketClientHandler webSocketClientHandler = new WebSocketClientHandler(WebSocketClientHandshakerFactory.newHandshaker(resolve, WebSocketVersion.V13, (String) null, false, defaultHttpHeaders));
                    ChannelPipeline pipeline = socketChannel.pipeline();
                    if (sslContext2 != null) {
                        pipeline.addLast(new ChannelHandler[]{sslContext2.newHandler(socketChannel.alloc(), host, port)});
                    }
                    pipeline.addLast(new ChannelHandler[]{new ReadTimeoutHandler(30)});
                    pipeline.addLast(new ChannelHandler[]{new HttpClientCodec(), new HttpObjectAggregator(8192)});
                    pipeline.addLast(NettyFirehoseOnSubscribe.HANDLER_NAME, webSocketClientHandler);
                    NettyFirehoseOnSubscribe.this.channel = socketChannel;
                }
            });
        } catch (KeyStoreException | NoSuchAlgorithmException | SSLException e) {
            throw new RuntimeException(e);
        }
    }

    public void call(Subscriber<? super Envelope> subscriber) {
        this.bootstrap.connect().addListener(channelFuture -> {
            if (channelFuture.isSuccess()) {
                channelFuture.channel().pipeline().get(WebSocketClientHandler.class).setSubscriber(subscriber);
            } else {
                subscriber.onError(channelFuture.cause());
            }
        });
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.channel != null) {
            this.channel.close();
        }
        if (this.eventLoopGroup != null) {
            this.eventLoopGroup.shutdownGracefully();
        }
    }

    private int getPort(String str, int i) {
        if (i != -1) {
            return i;
        }
        if ("ws".equalsIgnoreCase(str)) {
            return 80;
        }
        return "wss".equalsIgnoreCase(str) ? 443 : -1;
    }

    public boolean isConnected() {
        return this.channel != null && this.channel.isActive();
    }
}
