package org.apache.shenyu.protocol.tcp;

import com.google.common.eventbus.EventBus;
import io.netty.channel.ChannelHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import java.net.SocketAddress;
import java.util.List;
import org.apache.shenyu.common.dto.DiscoveryUpstreamData;
import org.apache.shenyu.protocol.tcp.connection.ActivityConnectionObserver;
import org.apache.shenyu.protocol.tcp.connection.Bridge;
import org.apache.shenyu.protocol.tcp.connection.ConnectionContext;
import org.apache.shenyu.protocol.tcp.connection.DefaultConnectionConfigProvider;
import org.apache.shenyu.protocol.tcp.connection.TcpConnectionBridge;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.netty.Connection;
import reactor.netty.DisposableServer;
import reactor.netty.resources.LoopResources;
import reactor.netty.tcp.TcpServer;

/* loaded from: input_file:org/apache/shenyu/protocol/tcp/TcpBootstrapServer.class */
public class TcpBootstrapServer implements BootstrapServer {
    private static final Logger LOG = LoggerFactory.getLogger(TcpBootstrapServer.class);
    private Bridge bridge;
    private ConnectionContext connectionContext;
    private DisposableServer server;
    private final EventBus eventBus;

    public TcpBootstrapServer(EventBus eventBus) {
        this.eventBus = eventBus;
    }

    @Override // org.apache.shenyu.protocol.tcp.BootstrapServer
    public void start(TcpServerConfiguration tcpServerConfiguration) {
        String obj = tcpServerConfiguration.getProps().getOrDefault("loadBalance", "random").toString();
        String obj2 = tcpServerConfiguration.getProps().getOrDefault("bossGroupThreadCount", "1").toString();
        String obj3 = tcpServerConfiguration.getProps().getOrDefault("workerGroupThreadCount", "12").toString();
        DefaultConnectionConfigProvider defaultConnectionConfigProvider = new DefaultConnectionConfigProvider(obj, tcpServerConfiguration.getPluginSelectorName());
        this.bridge = new TcpConnectionBridge();
        this.connectionContext = new ConnectionContext(defaultConnectionConfigProvider);
        this.connectionContext.init(tcpServerConfiguration.getProps());
        this.server = TcpServer.create().doOnChannelInit((connectionObserver, channel, socketAddress) -> {
            channel.pipeline().addFirst(new ChannelHandler[]{new LoggingHandler(LogLevel.INFO)});
        }).wiretap(true).observe((connection, state) -> {
            LOG.info("connection={}|status={}", connection, state);
        }).doOnConnection(this::bridgeConnections).port(tcpServerConfiguration.getPort()).runOn(LoopResources.create("shenyu-tcp-bootstrap-server", Integer.parseInt(obj2), Integer.parseInt(obj3), true)).bindNow();
    }

    private void bridgeConnections(Connection connection) {
        LOG.info("Starting proxy client ={}", connection);
        SocketAddress remoteAddress = connection.channel().remoteAddress();
        ActivityConnectionObserver activityConnectionObserver = new ActivityConnectionObserver("TcpClient");
        this.eventBus.register(activityConnectionObserver);
        this.connectionContext.getTcpClientConnection(getIp(remoteAddress), activityConnectionObserver).subscribe(connection2 -> {
            this.bridge.bridge(connection, connection2);
        });
    }

    private String getIp(SocketAddress socketAddress) {
        if (socketAddress == null) {
            throw new NullPointerException("remoteAddress is null");
        }
        String obj = socketAddress.toString();
        return obj.substring(2, obj.indexOf(58));
    }

    @Override // org.apache.shenyu.protocol.tcp.BootstrapServer
    public void removeCommonUpstream(List<DiscoveryUpstreamData> list) {
        this.eventBus.post(list);
    }

    @Override // org.apache.shenyu.protocol.tcp.BootstrapServer
    public void shutdown() {
        this.server.disposeNow();
    }
}
