package org.apache.flink.runtime.rest;

import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLEngine;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.runtime.rest.handler.RouterHandler;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpServerCodec;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;
import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/rest/RestServerEndpoint.class */
public abstract class RestServerEndpoint {
    public static final int MAX_REQUEST_SIZE_BYTES = 10485760;
    protected final Logger log = LoggerFactory.getLogger(getClass());
    private final Object lock = new Object();
    private final String configuredAddress;
    private final int configuredPort;
    private final SSLEngine sslEngine;
    private ServerBootstrap bootstrap;
    private Channel serverChannel;
    private String restAddress;
    private volatile boolean started;

    public RestServerEndpoint(RestServerEndpointConfiguration restServerEndpointConfiguration) {
        Preconditions.checkNotNull(restServerEndpointConfiguration);
        this.configuredAddress = restServerEndpointConfiguration.getEndpointBindAddress();
        this.configuredPort = restServerEndpointConfiguration.getEndpointBindPort();
        this.sslEngine = restServerEndpointConfiguration.getSslEngine();
        this.restAddress = null;
        this.started = false;
    }

    protected abstract Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> completableFuture);

    public void start() throws Exception {
        synchronized (this.lock) {
            if (this.started) {
                return;
            }
            this.log.info("Starting rest endpoint.");
            final Router router = new Router();
            CompletableFuture<String> completableFuture = new CompletableFuture<>();
            initializeHandlers(completableFuture).forEach(tuple2 -> {
                registerHandler(router, tuple2);
            });
            ChannelInitializer<SocketChannel> channelInitializer = new ChannelInitializer<SocketChannel>() { // from class: org.apache.flink.runtime.rest.RestServerEndpoint.1
                /* JADX INFO: Access modifiers changed from: protected */
                public void initChannel(SocketChannel socketChannel) {
                    RouterHandler routerHandler = new RouterHandler(router);
                    if (RestServerEndpoint.this.sslEngine != null) {
                        socketChannel.pipeline().addLast("ssl", new SslHandler(RestServerEndpoint.this.sslEngine));
                    }
                    socketChannel.pipeline().addLast(new ChannelHandler[]{new HttpServerCodec()}).addLast(new ChannelHandler[]{new HttpObjectAggregator(RestServerEndpoint.MAX_REQUEST_SIZE_BYTES)}).addLast(routerHandler.name(), routerHandler).addLast(new ChannelHandler[]{new PipelineErrorHandler(RestServerEndpoint.this.log)});
                }
            };
            NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("flink-rest-server-netty-boss"));
            NioEventLoopGroup nioEventLoopGroup2 = new NioEventLoopGroup(0, new DefaultThreadFactory("flink-rest-server-netty-worker"));
            this.bootstrap = new ServerBootstrap();
            this.bootstrap.group(nioEventLoopGroup, nioEventLoopGroup2).channel(NioServerSocketChannel.class).childHandler(channelInitializer);
            this.serverChannel = (this.configuredAddress == null ? this.bootstrap.bind(this.configuredPort) : this.bootstrap.bind(this.configuredAddress, this.configuredPort)).syncUninterruptibly().channel();
            InetSocketAddress inetSocketAddress = (InetSocketAddress) this.serverChannel.localAddress();
            String hostAddress = inetSocketAddress.getAddress().getHostAddress();
            int port = inetSocketAddress.getPort();
            this.log.info("Rest endpoint listening at {}:{}", hostAddress, Integer.valueOf(port));
            this.restAddress = (this.sslEngine != null ? "https://" : "http://") + hostAddress + ':' + port;
            completableFuture.complete(this.restAddress);
            this.started = true;
        }
    }

    public InetSocketAddress getServerAddress() {
        Preconditions.checkState(this.started, "The RestServerEndpoint has not been started yet.");
        Channel channel = this.serverChannel;
        if (channel == null) {
            return null;
        }
        try {
            return (InetSocketAddress) channel.localAddress();
        } catch (Exception e) {
            this.log.error("Cannot access local server address", e);
            return null;
        }
    }

    public String getRestAddress() {
        Preconditions.checkState(this.started, "The RestServerEndpoint has not been started yet.");
        return this.restAddress;
    }

    public void shutdown(Time time) {
        synchronized (this.lock) {
            if (this.started) {
                this.log.info("Shutting down rest endpoint.");
                CompletableFuture completableFuture = new CompletableFuture();
                if (this.serverChannel != null) {
                    this.serverChannel.close().addListener(future -> {
                        completableFuture.complete(null);
                    });
                    this.serverChannel = null;
                }
                CompletableFuture completableFuture2 = new CompletableFuture();
                CompletableFuture completableFuture3 = new CompletableFuture();
                completableFuture.thenRun(() -> {
                    if (this.bootstrap == null) {
                        completableFuture2.complete(null);
                        completableFuture3.complete(null);
                        return;
                    }
                    if (this.bootstrap.group() != null) {
                        this.bootstrap.group().shutdownGracefully(0L, time.toMilliseconds(), TimeUnit.MILLISECONDS).addListener(future2 -> {
                            completableFuture2.complete(null);
                        });
                    }
                    if (this.bootstrap.childGroup() != null) {
                        this.bootstrap.childGroup().shutdownGracefully(0L, time.toMilliseconds(), TimeUnit.MILLISECONDS).addListener(future3 -> {
                            completableFuture3.complete(null);
                        });
                    }
                    this.bootstrap = null;
                });
                try {
                    CompletableFuture.allOf(completableFuture2, completableFuture3).get(time.toMilliseconds(), TimeUnit.MILLISECONDS);
                    this.log.info("Rest endpoint shutdown complete.");
                } catch (Exception e) {
                    this.log.warn("Rest endpoint shutdown failed.", e);
                }
                this.restAddress = null;
                this.started = false;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void registerHandler(Router router, Tuple2<RestHandlerSpecification, ChannelInboundHandler> tuple2) {
        switch (((RestHandlerSpecification) tuple2.f0).getHttpMethod()) {
            case GET:
                router.GET(((RestHandlerSpecification) tuple2.f0).getTargetRestEndpointURL(), tuple2.f1);
                return;
            case POST:
                router.POST(((RestHandlerSpecification) tuple2.f0).getTargetRestEndpointURL(), tuple2.f1);
                return;
            case DELETE:
                router.DELETE(((RestHandlerSpecification) tuple2.f0).getTargetRestEndpointURL(), tuple2.f1);
                return;
            case PATCH:
                router.PATCH(((RestHandlerSpecification) tuple2.f0).getTargetRestEndpointURL(), tuple2.f1);
                return;
            default:
                throw new RuntimeException("Unsupported http method: " + ((RestHandlerSpecification) tuple2.f0).getHttpMethod() + '.');
        }
    }
}
