package org.springframework.messaging.tcp.reactor;

import io.netty.buffer.ByteBuf;
import org.springframework.messaging.Message;
import org.springframework.messaging.tcp.TcpConnection;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.MonoToListenableFutureAdapter;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.netty.NettyInbound;
import reactor.netty.NettyOutbound;

/* loaded from: input_file:WEB-INF/lib/spring-messaging-5.3.19.jar:org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.class */
public class ReactorNettyTcpConnection<P> implements TcpConnection<P> {
    private final NettyInbound inbound;
    private final NettyOutbound outbound;
    private final ReactorNettyCodec<P> codec;
    private final Sinks.Empty<Void> completionSink;

    public ReactorNettyTcpConnection(NettyInbound nettyInbound, NettyOutbound nettyOutbound, ReactorNettyCodec<P> reactorNettyCodec, Sinks.Empty<Void> empty) {
        this.inbound = nettyInbound;
        this.outbound = nettyOutbound;
        this.codec = reactorNettyCodec;
        this.completionSink = empty;
    }

    @Override // org.springframework.messaging.tcp.TcpConnection
    public ListenableFuture<Void> send(Message<P> message) {
        ByteBuf buffer = this.outbound.alloc().buffer();
        this.codec.encode(message, buffer);
        return new MonoToListenableFutureAdapter(this.outbound.send(Mono.just(buffer)).then());
    }

    @Override // org.springframework.messaging.tcp.TcpConnection
    public void onReadInactivity(Runnable runnable, long j) {
        this.inbound.withConnection(connection -> {
            connection.onReadIdle(j, runnable);
        });
    }

    @Override // org.springframework.messaging.tcp.TcpConnection
    public void onWriteInactivity(Runnable runnable, long j) {
        this.inbound.withConnection(connection -> {
            connection.onWriteIdle(j, runnable);
        });
    }

    @Override // org.springframework.messaging.tcp.TcpConnection, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.completionSink.tryEmitEmpty();
    }
}
