package org.apache.flink.queryablestate.network;

import java.net.BindException;
import java.net.InetSocketAddress;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.queryablestate.network.messages.MessageBody;
import org.apache.flink.shaded.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption;
import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/queryablestate/network/AbstractServerBase.class */
public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends MessageBody> {
    private static final int LOW_WATER_MARK = 8192;
    private static final int HIGH_WATER_MARK = 32768;
    private final String serverName;
    private final String bindAddress;
    private final Set<Integer> bindPortRange;
    private final int numEventLoopThreads;
    private final int numQueryThreads;
    private ServerBootstrap bootstrap;
    private ExecutorService queryExecutor;
    private InetSocketAddress serverAddress;
    private AbstractServerHandler<REQ, RESP> handler;
    protected final Logger log = LoggerFactory.getLogger(getClass());
    private final AtomicReference<CompletableFuture<Void>> serverShutdownFuture = new AtomicReference<>(null);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/queryablestate/network/AbstractServerBase$ServerChannelInitializer.class */
    public static final class ServerChannelInitializer<REQ extends MessageBody, RESP extends MessageBody> extends ChannelInitializer<SocketChannel> {
        private final AbstractServerHandler<REQ, RESP> sharedRequestHandler;

        ServerChannelInitializer(AbstractServerHandler<REQ, RESP> abstractServerHandler) {
            this.sharedRequestHandler = (AbstractServerHandler) Preconditions.checkNotNull(abstractServerHandler, "MessageBody handler");
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void initChannel(SocketChannel socketChannel) throws Exception {
            socketChannel.pipeline().addLast(new ChannelHandler[]{new ChunkedWriteHandler()}).addLast(new ChannelHandler[]{new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)}).addLast(new ChannelHandler[]{this.sharedRequestHandler});
        }
    }

    protected AbstractServerBase(String str, String str2, Iterator<Integer> it, Integer num, Integer num2) {
        Preconditions.checkNotNull(it);
        Preconditions.checkArgument(num.intValue() >= 1, "Non-positive number of event loop threads.");
        Preconditions.checkArgument(num2.intValue() >= 1, "Non-positive number of query threads.");
        this.serverName = (String) Preconditions.checkNotNull(str);
        this.bindAddress = (String) Preconditions.checkNotNull(str2);
        this.numEventLoopThreads = num.intValue();
        this.numQueryThreads = num2.intValue();
        this.bindPortRange = new HashSet();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            Preconditions.checkArgument(intValue >= 0 && intValue <= 65535, "Invalid port configuration. Port must be between 0 and 65535, but was " + intValue + ".");
            this.bindPortRange.add(Integer.valueOf(intValue));
        }
    }

    private ExecutorService createQueryExecutor() {
        return Executors.newFixedThreadPool(this.numQueryThreads, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Flink " + getServerName() + " Thread %d").build());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ExecutorService getQueryExecutor() {
        return this.queryExecutor;
    }

    public String getServerName() {
        return this.serverName;
    }

    public abstract AbstractServerHandler<REQ, RESP> initializeHandler();

    public InetSocketAddress getServerAddress() {
        Preconditions.checkState(this.serverAddress != null, "Server " + this.serverName + " has not been started.");
        return this.serverAddress;
    }

    public void start() throws Throwable {
        Preconditions.checkState(this.serverAddress == null && this.serverShutdownFuture.get() == null, this.serverName + " is already running @ " + this.serverAddress + ". ");
        Iterator<Integer> it = this.bindPortRange.iterator();
        while (it.hasNext() && !attemptToBind(it.next().intValue())) {
        }
        if (this.serverAddress != null) {
            this.log.info("Started {} @ {}.", this.serverName, this.serverAddress);
        } else {
            this.log.info("Unable to start {}. All ports in provided range ({}) are occupied.", this.serverName, this.bindPortRange);
            throw new FlinkRuntimeException("Unable to start " + this.serverName + ". All ports in provided range are occupied.");
        }
    }

    private boolean attemptToBind(int i) throws Throwable {
        this.log.debug("Attempting to start {} on port {}.", this.serverName, Integer.valueOf(i));
        this.queryExecutor = createQueryExecutor();
        this.handler = initializeHandler();
        NettyBufferPool nettyBufferPool = new NettyBufferPool(this.numEventLoopThreads);
        this.bootstrap = new ServerBootstrap().localAddress(this.bindAddress, i).group(new NioEventLoopGroup(this.numEventLoopThreads, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Flink " + this.serverName + " EventLoop Thread %d").build())).channel(NioServerSocketChannel.class).option(ChannelOption.ALLOCATOR, nettyBufferPool).childOption(ChannelOption.ALLOCATOR, nettyBufferPool).childHandler(new ServerChannelInitializer(this.handler));
        this.bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, Integer.valueOf(LOW_WATER_MARK));
        this.bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, Integer.valueOf(HIGH_WATER_MARK));
        try {
            ChannelFuture sync = this.bootstrap.bind().sync();
            if (!sync.isSuccess()) {
                throw sync.cause();
            }
            InetSocketAddress inetSocketAddress = (InetSocketAddress) sync.channel().localAddress();
            this.serverAddress = new InetSocketAddress(inetSocketAddress.getAddress(), inetSocketAddress.getPort());
            return true;
        } catch (BindException e) {
            this.log.debug("Failed to start {} on port {}: {}.", new Object[]{this.serverName, Integer.valueOf(i), e.getMessage()});
            try {
                shutdownServer().whenComplete((r4, th) -> {
                    this.serverShutdownFuture.getAndSet(null);
                }).get();
                return false;
            } catch (Exception e2) {
                this.log.warn("Problem while shutting down {}: {}", this.serverName, e2.getMessage());
                return false;
            }
        }
    }

    public CompletableFuture<Void> shutdownServer() {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        if (this.serverShutdownFuture.compareAndSet(null, completableFuture)) {
            this.log.info("Shutting down {} @ {}", this.serverName, this.serverAddress);
            CompletableFuture completableFuture2 = new CompletableFuture();
            if (this.bootstrap != null) {
                EventLoopGroup group = this.bootstrap.config().group();
                if (group == null || group.isShutdown()) {
                    completableFuture2.complete(null);
                } else {
                    group.shutdownGracefully(0L, 0L, TimeUnit.MILLISECONDS).addListener(future -> {
                        if (future.isSuccess()) {
                            completableFuture2.complete(null);
                        } else {
                            completableFuture2.completeExceptionally(future.cause());
                        }
                    });
                }
            } else {
                completableFuture2.complete(null);
            }
            CompletableFuture completableFuture3 = new CompletableFuture();
            if (this.handler == null) {
                completableFuture3.complete(null);
            } else {
                this.handler.shutdown().whenComplete((r4, th) -> {
                    if (th != null) {
                        completableFuture3.completeExceptionally(th);
                    } else {
                        completableFuture3.complete(null);
                    }
                });
            }
            CompletableFuture.allOf(CompletableFuture.runAsync(() -> {
                if (this.queryExecutor != null) {
                    ExecutorUtils.gracefulShutdown(10L, TimeUnit.MINUTES, new ExecutorService[]{this.queryExecutor});
                }
            }), completableFuture2, completableFuture3).whenComplete((r42, th2) -> {
                if (th2 != null) {
                    completableFuture.completeExceptionally(th2);
                } else {
                    completableFuture.complete(null);
                }
            });
        }
        return this.serverShutdownFuture.get();
    }

    @VisibleForTesting
    public boolean isEventGroupShutdown() {
        return this.bootstrap == null || this.bootstrap.config().group().isTerminated();
    }
}
