/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.queryablestate.network;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.queryablestate.network.NettyBufferPool;
import org.apache.flink.queryablestate.network.ServerConnection;
import org.apache.flink.queryablestate.network.messages.MessageBody;
import org.apache.flink.queryablestate.network.messages.MessageSerializer;
import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
import org.apache.flink.shaded.guava33.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption;
import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.GenericFutureListener;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class Client<REQ extends MessageBody, RESP extends MessageBody> {
    private static final Logger LOG = LoggerFactory.getLogger(Client.class);
    private final String clientName;
    private final Bootstrap bootstrap;
    private final MessageSerializer<REQ, RESP> messageSerializer;
    private final KvStateRequestStats stats;
    private final Map<InetSocketAddress, ServerConnection<REQ, RESP>> connections = new ConcurrentHashMap<InetSocketAddress, ServerConnection<REQ, RESP>>();
    private final AtomicReference<CompletableFuture<Void>> clientShutdownFuture = new AtomicReference<Object>(null);

    public Client(String clientName, int numEventLoopThreads, MessageSerializer<REQ, RESP> serializer, KvStateRequestStats stats) {
        Preconditions.checkArgument((numEventLoopThreads >= 1 ? 1 : 0) != 0, (Object)"Non-positive number of event loop threads.");
        this.clientName = (String)Preconditions.checkNotNull((Object)clientName);
        this.messageSerializer = (MessageSerializer)Preconditions.checkNotNull(serializer);
        this.stats = (KvStateRequestStats)Preconditions.checkNotNull((Object)stats);
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Flink " + clientName + " Event Loop Thread %d").build();
        NioEventLoopGroup nioGroup = new NioEventLoopGroup(numEventLoopThreads, threadFactory);
        NettyBufferPool bufferPool = new NettyBufferPool(numEventLoopThreads);
        this.bootstrap = (Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)new Bootstrap().group((EventLoopGroup)nioGroup)).channel(NioSocketChannel.class)).option(ChannelOption.ALLOCATOR, (Object)bufferPool)).handler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

            protected void initChannel(SocketChannel channel) {
                channel.pipeline().addLast(new ChannelHandler[]{new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)}).addLast(new ChannelHandler[]{new ChunkedWriteHandler()});
            }
        });
    }

    public String getClientName() {
        return this.clientName;
    }

    public CompletableFuture<RESP> sendRequest(InetSocketAddress serverAddress, REQ request) {
        if (this.clientShutdownFuture.get() != null) {
            return FutureUtils.completedExceptionally((Throwable)new IllegalStateException(this.clientName + " is already shut down."));
        }
        ServerConnection connection = this.connections.computeIfAbsent(serverAddress, ignored -> {
            ServerConnection<REQ, RESP> newConnection = ServerConnection.createPendingConnection(this.clientName, this.messageSerializer, this.stats);
            this.bootstrap.connect(serverAddress.getAddress(), serverAddress.getPort()).addListener((GenericFutureListener)((ChannelFutureListener)newConnection::establishConnection));
            newConnection.getCloseFuture().handle((ignoredA, ignoredB) -> this.connections.remove(serverAddress, newConnection));
            return newConnection;
        });
        return connection.sendRequest(request);
    }

    public CompletableFuture<Void> shutdown() {
        CompletableFuture<Void> newShutdownFuture = new CompletableFuture<Void>();
        if (this.clientShutdownFuture.compareAndSet(null, newShutdownFuture)) {
            ArrayList<CompletableFuture<Void>> connectionFutures = new ArrayList<CompletableFuture<Void>>();
            for (Map.Entry<InetSocketAddress, ServerConnection<REQ, RESP>> conn : this.connections.entrySet()) {
                if (!this.connections.remove(conn.getKey(), conn.getValue())) continue;
                connectionFutures.add(conn.getValue().close());
            }
            CompletableFuture.allOf(connectionFutures.toArray(new CompletableFuture[connectionFutures.size()])).whenComplete((result, throwable) -> {
                if (throwable != null) {
                    LOG.warn("Problem while shutting down the connections at the {}: {}", (Object)this.clientName, throwable);
                }
                if (this.bootstrap != null) {
                    EventLoopGroup group = this.bootstrap.config().group();
                    if (group != null && !group.isShutdown()) {
                        group.shutdownGracefully(0L, 0L, TimeUnit.MILLISECONDS).addListener(finished -> {
                            if (finished.isSuccess()) {
                                newShutdownFuture.complete(null);
                            } else {
                                newShutdownFuture.completeExceptionally(finished.cause());
                            }
                        });
                    } else {
                        newShutdownFuture.complete(null);
                    }
                } else {
                    newShutdownFuture.complete(null);
                }
            });
            return newShutdownFuture;
        }
        return this.clientShutdownFuture.get();
    }

    @VisibleForTesting
    public boolean isEventGroupShutdown() {
        return this.bootstrap == null || this.bootstrap.config().group().isTerminated();
    }
}

