package org.apache.ratis.netty.client;

import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Supplier;
import org.apache.ratis.client.DataStreamClientRpc;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
import org.apache.ratis.datastream.impl.DataStreamRequestFilePositionCount;
import org.apache.ratis.netty.NettyDataStreamUtils;
import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.DataStreamRequest;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.thirdparty.io.netty.bootstrap.Bootstrap;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.thirdparty.io.netty.channel.Channel;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelFuture;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelInboundHandler;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelInitializer;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelOption;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelPipeline;
import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.ratis.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.ratis.thirdparty.io.netty.channel.socket.SocketChannel;
import org.apache.ratis.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.ratis.thirdparty.io.netty.handler.codec.ByteToMessageDecoder;
import org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/ratis/netty/client/NettyClientStreamRpc.class */
public class NettyClientStreamRpc implements DataStreamClientRpc {
    public static final Logger LOG = LoggerFactory.getLogger(NettyClientStreamRpc.class);
    private final String name;
    private final Supplier<Channel> channel;
    private final EventLoopGroup workerGroup = new NioEventLoopGroup();
    private final ConcurrentMap<ClientInvocationId, Queue<CompletableFuture<DataStreamReply>>> replies = new ConcurrentHashMap();

    public NettyClientStreamRpc(RaftPeer raftPeer, RaftProperties raftProperties) {
        this.name = JavaUtils.getClassSimpleName(getClass()) + "->" + raftPeer;
        ChannelFuture connect = new Bootstrap().group(this.workerGroup).channel(NioSocketChannel.class).handler(getInitializer()).option(ChannelOption.SO_KEEPALIVE, true).connect(NetUtils.createSocketAddr(raftPeer.getDataStreamAddress()));
        this.channel = JavaUtils.memoize(() -> {
            return connect.syncUninterruptibly2().channel();
        });
    }

    private Channel getChannel() {
        return this.channel.get();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ChannelInboundHandler getClientHandler() {
        return new ChannelInboundHandlerAdapter() { // from class: org.apache.ratis.netty.client.NettyClientStreamRpc.1
            @Override // org.apache.ratis.thirdparty.io.netty.channel.ChannelInboundHandlerAdapter, org.apache.ratis.thirdparty.io.netty.channel.ChannelInboundHandler
            public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
                if (!(obj instanceof DataStreamReply)) {
                    NettyClientStreamRpc.LOG.error("{}: unexpected message {}", this, obj.getClass());
                    return;
                }
                DataStreamReply dataStreamReply = (DataStreamReply) obj;
                NettyClientStreamRpc.LOG.debug("{}: read {}", this, dataStreamReply);
                Optional.ofNullable(NettyClientStreamRpc.this.replies.get(ClientInvocationId.valueOf(dataStreamReply.getClientId(), dataStreamReply.getStreamId()))).map((v0) -> {
                    return v0.poll();
                }).ifPresent(completableFuture -> {
                    completableFuture.complete(dataStreamReply);
                });
            }
        };
    }

    private ChannelInitializer<SocketChannel> getInitializer() {
        return new ChannelInitializer<SocketChannel>() { // from class: org.apache.ratis.netty.client.NettyClientStreamRpc.2
            @Override // org.apache.ratis.thirdparty.io.netty.channel.ChannelInitializer
            public void initChannel(SocketChannel socketChannel) {
                ChannelPipeline pipeline = socketChannel.pipeline();
                pipeline.addLast(NettyClientStreamRpc.this.newEncoder());
                pipeline.addLast(NettyClientStreamRpc.this.newEncoderDataStreamRequestFilePositionCount());
                pipeline.addLast(NettyClientStreamRpc.this.newDecoder());
                pipeline.addLast(NettyClientStreamRpc.this.getClientHandler());
            }
        };
    }

    MessageToMessageEncoder<DataStreamRequestByteBuffer> newEncoder() {
        return new MessageToMessageEncoder<DataStreamRequestByteBuffer>() { // from class: org.apache.ratis.netty.client.NettyClientStreamRpc.3
            /* renamed from: encode, reason: avoid collision after fix types in other method */
            protected void encode2(ChannelHandlerContext channelHandlerContext, DataStreamRequestByteBuffer dataStreamRequestByteBuffer, List<Object> list) {
                list.getClass();
                NettyDataStreamUtils.encodeDataStreamRequestByteBuffer(dataStreamRequestByteBuffer, list::add, channelHandlerContext.alloc());
            }

            /* JADX INFO: Access modifiers changed from: protected */
            @Override // org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncoder
            public /* bridge */ /* synthetic */ void encode(ChannelHandlerContext channelHandlerContext, DataStreamRequestByteBuffer dataStreamRequestByteBuffer, List list) throws Exception {
                encode2(channelHandlerContext, dataStreamRequestByteBuffer, (List<Object>) list);
            }
        };
    }

    MessageToMessageEncoder<DataStreamRequestFilePositionCount> newEncoderDataStreamRequestFilePositionCount() {
        return new MessageToMessageEncoder<DataStreamRequestFilePositionCount>() { // from class: org.apache.ratis.netty.client.NettyClientStreamRpc.4
            /* renamed from: encode, reason: avoid collision after fix types in other method */
            protected void encode2(ChannelHandlerContext channelHandlerContext, DataStreamRequestFilePositionCount dataStreamRequestFilePositionCount, List<Object> list) {
                list.getClass();
                NettyDataStreamUtils.encodeDataStreamRequestFilePositionCount(dataStreamRequestFilePositionCount, list::add, channelHandlerContext.alloc());
            }

            /* JADX INFO: Access modifiers changed from: protected */
            @Override // org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncoder
            public /* bridge */ /* synthetic */ void encode(ChannelHandlerContext channelHandlerContext, DataStreamRequestFilePositionCount dataStreamRequestFilePositionCount, List list) throws Exception {
                encode2(channelHandlerContext, dataStreamRequestFilePositionCount, (List<Object>) list);
            }
        };
    }

    ByteToMessageDecoder newDecoder() {
        return new ByteToMessageDecoder() { // from class: org.apache.ratis.netty.client.NettyClientStreamRpc.5
            {
                setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
            }

            @Override // org.apache.ratis.thirdparty.io.netty.handler.codec.ByteToMessageDecoder
            protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) {
                Optional ofNullable = Optional.ofNullable(NettyDataStreamUtils.decodeDataStreamReplyByteBuffer(byteBuf));
                list.getClass();
                ofNullable.ifPresent((v1) -> {
                    r1.add(v1);
                });
            }
        };
    }

    @Override // org.apache.ratis.client.DataStreamClientRpc
    public CompletableFuture<DataStreamReply> streamAsync(DataStreamRequest dataStreamRequest) {
        CompletableFuture<DataStreamReply> completableFuture = new CompletableFuture<>();
        if (!this.replies.computeIfAbsent(ClientInvocationId.valueOf(dataStreamRequest.getClientId(), dataStreamRequest.getStreamId()), clientInvocationId -> {
            return new ConcurrentLinkedQueue();
        }).offer(completableFuture)) {
            completableFuture.completeExceptionally(new IllegalStateException(this + ": Failed to offer a future for " + dataStreamRequest));
            return completableFuture;
        }
        LOG.debug("{}: write {}", this, dataStreamRequest);
        getChannel().writeAndFlush(dataStreamRequest);
        return completableFuture;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        getChannel().close().syncUninterruptibly2();
        this.workerGroup.shutdownGracefully();
    }

    public String toString() {
        return this.name;
    }
}
