/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.common.network.netty;

import io.netty.buffer.ByteBuf;
import io.netty.channel.EventLoopGroup;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.ssl.SslContext;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.network.ChannelState;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.network.netty.ByteBufReceive;
import org.apache.kafka.common.network.netty.NettyClient;
import org.apache.kafka.common.network.netty.NettyStream;
import org.apache.kafka.common.network.netty.ReadableByteBuf;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;

public class NettyHttp2Selector
implements Selectable,
AutoCloseable {
    private static final Runnable NOOP = () -> {};
    private final LogContext logContext;
    private final Logger log;
    private final NettyClient nettyClient;
    private final Map<String, StreamChannel> streams = new HashMap<String, StreamChannel>();
    private final BlockingQueue<Runnable> pollTasks = new LinkedBlockingQueue<Runnable>();
    private final List<String> connected = new ArrayList<String>();
    private final Map<String, ChannelState> disconnected = new HashMap<String, ChannelState>();
    private final List<NetworkSend> completedSends = new ArrayList<NetworkSend>();
    private final List<NetworkReceive> completedReceives = new ArrayList<NetworkReceive>();

    public NettyHttp2Selector(EventLoopGroup eventLoopGroup, SslContext sslContext, LogContext logContext, boolean flowControlEnabled) {
        this.logContext = logContext;
        this.log = logContext.logger(NettyHttp2Selector.class);
        this.nettyClient = new NettyClient(sslContext, eventLoopGroup, logContext, flowControlEnabled);
    }

    public NettyHttp2Selector(EventLoopGroup eventLoopGroup, SslContext sslContext, LogContext logContext) {
        this(eventLoopGroup, sslContext, logContext, true);
    }

    @Override
    public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) {
        if (this.streams.containsKey(id)) {
            throw new IllegalStateException("There is already a connection for id " + id);
        }
        StreamChannel streamChannel = new StreamChannel(id, this.logContext);
        this.streams.put(id, streamChannel);
        CompletableFuture<NettyStream> streamFuture = this.nettyClient.createStream(address, sendBufferSize, receiveBufferSize, streamChannel, this.streamCreationHeaders());
        streamFuture.whenComplete((stream, t) -> this.scheduleOnPollThread(() -> {
            if (t != null) {
                this.log.error("Unable to create connection to endpoint {} for connection id {}", new Object[]{address, id, t});
                if (this.streams.remove(id, streamChannel)) {
                    this.disconnected.put(id, ChannelState.NOT_CONNECTED);
                }
            } else if (this.streams.get(id) == streamChannel) {
                streamChannel.stream((NettyStream)stream);
                this.connected.add(id);
            } else {
                stream.closeStream();
            }
        }));
    }

    protected Http2Headers streamCreationHeaders() {
        return new DefaultHttp2Headers();
    }

    @Override
    public void close() {
        this.streams.clear();
        this.nettyClient.shutdown().whenComplete((ignore, throwable) -> {
            if (throwable == null) {
                this.log.info("Successfully closed http client.");
            } else {
                this.log.error("Failed to close http client.", throwable);
            }
        });
    }

    @Override
    public void close(String id) {
        StreamChannel streamChannel = this.streams.remove(id);
        if (streamChannel != null && streamChannel.stream() != null) {
            streamChannel.stream().closeStream().addListener(listener -> {
                if (listener.isSuccess()) {
                    this.log.info("Successfully closed stream with id {}", (Object)id);
                } else {
                    this.log.error("Failed to close stream with id {}", (Object)id, (Object)listener.cause());
                }
            });
        }
    }

    @Override
    public void send(NetworkSend send) {
        String connectionId = send.destinationId();
        this.streams.get(connectionId).checkInitialized().startSend(send);
    }

    @Override
    public void poll(long timeout) {
        this.completedSends.clear();
        this.completedReceives.clear();
        this.connected.clear();
        this.disconnected.clear();
        ArrayList tasks = new ArrayList();
        this.pollTasks.drainTo(tasks);
        tasks.forEach(Runnable::run);
        if (this.completedSends.isEmpty() && this.completedReceives.isEmpty() && this.connected.isEmpty() && this.disconnected.isEmpty()) {
            try {
                Runnable op = this.pollTasks.poll(timeout, TimeUnit.MILLISECONDS);
                if (op != null) {
                    op.run();
                }
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }

    @Override
    public void wakeup() {
        this.pollTasks.add(NOOP);
    }

    @Override
    public List<NetworkSend> completedSends() {
        return this.completedSends;
    }

    @Override
    public Collection<NetworkReceive> completedReceives() {
        return this.completedReceives;
    }

    @Override
    public Map<String, ChannelState> disconnected() {
        return this.disconnected;
    }

    @Override
    public List<String> connected() {
        return this.connected;
    }

    @Override
    public void mute(String id) {
    }

    @Override
    public void unmute(String id) {
    }

    @Override
    public void muteAll() {
    }

    @Override
    public void unmuteAll() {
    }

    @Override
    public boolean isChannelReady(String id) {
        return this.streams.containsKey(id);
    }

    private void scheduleOnPollThread(Runnable op) {
        this.pollTasks.add(op);
    }

    Map<String, StreamChannel> streams() {
        return this.streams;
    }

    BlockingQueue<Runnable> pollTasks() {
        return this.pollTasks;
    }

    class StreamChannel
    implements NettyStream.StreamHandler {
        private final Logger log;
        private final String connectionId;
        private NettyStream nettyStream;
        private NetworkSend pendingSend;
        private ByteBufReceive pendingReceive;

        public StreamChannel(String id, LogContext logContext) {
            this.connectionId = id;
            this.log = logContext.logger(StreamChannel.class);
        }

        public void startSend(NetworkSend send) {
            this.nettyStream.runOnEventLoop(() -> {
                if (this.pendingSend != null) {
                    String errMsg = "Attempt to begin a send operation with prior send operation still in progress for connection " + this.connectionId;
                    this.log.error(errMsg, (Throwable)new IllegalStateException(errMsg));
                    return;
                }
                this.pendingSend = send;
                if (this.nettyStream.isReadyForSending()) {
                    this.handleReadyForSend();
                }
            }, true);
        }

        @Override
        public void handleReadyForSend() {
            if (this.pendingSend == null) {
                return;
            }
            try {
                this.pendingSend.writeTo(this.nettyStream.transferableChannel());
                if (this.pendingSend.completed()) {
                    NetworkSend completedSend = this.pendingSend;
                    this.pendingSend = null;
                    NettyHttp2Selector.this.scheduleOnPollThread(() -> NettyHttp2Selector.this.completedSends.add(completedSend));
                }
            }
            catch (IOException e) {
                NettyHttp2Selector.this.scheduleOnPollThread(() -> {
                    NettyHttp2Selector.this.close(this.connectionId);
                    NettyHttp2Selector.this.disconnected.put(this.connectionId, ChannelState.FAILED_SEND);
                });
                this.pendingSend = null;
            }
        }

        @Override
        public void handleData(ByteBuf data) {
            while (data.isReadable()) {
                if (this.pendingReceive == null) {
                    this.pendingReceive = new ByteBufReceive(data.alloc(), Integer.MAX_VALUE);
                }
                this.pendingReceive.readFrom(data);
                if (!this.pendingReceive.complete()) continue;
                ReadableByteBuf payload = this.pendingReceive.payload();
                NetworkReceive completedReceive = new NetworkReceive(this.connectionId, payload.nioBuffer());
                payload.close();
                this.pendingReceive.close();
                this.pendingReceive = null;
                NettyHttp2Selector.this.scheduleOnPollThread(() -> NettyHttp2Selector.this.completedReceives.add(completedReceive));
            }
            this.nettyStream.receiveMore();
        }

        @Override
        public void handleException(Throwable t) {
            NettyHttp2Selector.this.scheduleOnPollThread(() -> {
                if (!NettyHttp2Selector.this.streams.containsKey(this.connectionId)) {
                    return;
                }
                this.log.error("Closing channel {}, error: {}", new Object[]{this.connectionId, t.getMessage(), t});
                NettyHttp2Selector.this.close(this.connectionId);
                NettyHttp2Selector.this.disconnected.put(this.connectionId, ChannelState.LOCAL_CLOSE);
            });
        }

        @Override
        public void handleClose() {
            NettyHttp2Selector.this.scheduleOnPollThread(() -> {
                if (!NettyHttp2Selector.this.streams.containsKey(this.connectionId)) {
                    return;
                }
                if (NettyHttp2Selector.this.streams.remove(this.connectionId) != null) {
                    NettyHttp2Selector.this.disconnected.put(this.connectionId, ChannelState.LOCAL_CLOSE);
                }
            });
        }

        public NettyStream stream() {
            return this.nettyStream;
        }

        ByteBufReceive pendingReceive() {
            return this.pendingReceive;
        }

        NetworkSend pendingSend() {
            return this.pendingSend;
        }

        public void stream(NettyStream nettyStream) {
            this.nettyStream = nettyStream;
        }

        public StreamChannel checkInitialized() {
            if (this.nettyStream == null) {
                throw new IllegalStateException("Stream is not yet initialized.");
            }
            return this;
        }
    }
}

