package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;

import javax.annotation.Nullable;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriterImpl.class */
public class NettyConnectionWriterImpl implements NettyConnectionWriter {
    private final NettyPayloadManager nettyPayloadManager;
    private final NettyConnectionId connectionId = NettyConnectionId.newId();
    private final BufferAvailabilityListener availabilityListener;

    public NettyConnectionWriterImpl(NettyPayloadManager nettyPayloadManager, BufferAvailabilityListener bufferAvailabilityListener) {
        this.nettyPayloadManager = nettyPayloadManager;
        this.availabilityListener = bufferAvailabilityListener;
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyConnectionWriter
    public NettyConnectionId getNettyConnectionId() {
        return this.connectionId;
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyConnectionWriter
    public void notifyAvailable() {
        this.availabilityListener.notifyDataAvailable();
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyConnectionWriter
    public int numQueuedPayloads() {
        return this.nettyPayloadManager.getSize();
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyConnectionWriter
    public int numQueuedBufferPayloads() {
        return this.nettyPayloadManager.getBacklog();
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyConnectionWriter
    public void writeNettyPayload(NettyPayload nettyPayload) {
        this.nettyPayloadManager.add(nettyPayload);
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyConnectionWriter
    public void close(@Nullable Throwable th) {
        while (true) {
            NettyPayload poll = this.nettyPayloadManager.poll();
            if (poll == null) {
                break;
            } else {
                poll.getBuffer().ifPresent((v0) -> {
                    v0.recycleBuffer();
                });
            }
        }
        if (th != null) {
            writeNettyPayload(NettyPayload.newError(th));
        }
    }
}
