package io.axoniq.axonserver.connector.impl.buffer;

import io.axoniq.axonserver.connector.FlowControl;
import io.axoniq.axonserver.connector.impl.CloseableReadonlyBuffer;
import io.axoniq.axonserver.connector.impl.DisposableReadonlyBuffer;
import io.axoniq.axonserver.grpc.ErrorMessage;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;

/* loaded from: input_file:io/axoniq/axonserver/connector/impl/buffer/FlowControlledDisposableReadonlyBuffer.class */
public class FlowControlledDisposableReadonlyBuffer<T> implements DisposableReadonlyBuffer<T> {
    private final FlowControl flowControl;
    private final CloseableReadonlyBuffer<T> buffer;
    private final AtomicBoolean started = new AtomicBoolean();

    public FlowControlledDisposableReadonlyBuffer(FlowControl flowControl, CloseableReadonlyBuffer<T> closeableReadonlyBuffer) {
        this.flowControl = flowControl;
        this.buffer = closeableReadonlyBuffer;
    }

    @Override // io.axoniq.axonserver.connector.impl.CloseableReadonlyBuffer
    public Optional<T> poll() {
        replenishIfNotStarted();
        Optional<T> poll = this.buffer.poll();
        poll.ifPresent(obj -> {
            markConsumed();
        });
        return poll;
    }

    @Override // io.axoniq.axonserver.connector.impl.CloseableReadonlyBuffer
    public boolean isEmpty() {
        return this.buffer.isEmpty();
    }

    @Override // io.axoniq.axonserver.connector.impl.CloseableReadonlyBuffer
    public int capacity() {
        return this.buffer.capacity();
    }

    @Override // io.axoniq.axonserver.connector.impl.CloseableReadonlyBuffer
    public void onAvailable(Runnable runnable) {
        this.buffer.onAvailable(runnable);
    }

    @Override // io.axoniq.axonserver.connector.impl.DisposableReadonlyBuffer
    public void dispose() {
        this.flowControl.cancel();
    }

    @Override // io.axoniq.axonserver.connector.impl.CloseableReadonlyBuffer
    public boolean closed() {
        return this.buffer.closed();
    }

    @Override // io.axoniq.axonserver.connector.impl.CloseableReadonlyBuffer
    public Optional<ErrorMessage> error() {
        return this.buffer.error();
    }

    private void replenishIfNotStarted() {
        if (this.started.compareAndSet(false, true)) {
            this.flowControl.request(this.buffer.capacity());
        }
    }

    private void markConsumed() {
        if (closed()) {
            return;
        }
        this.flowControl.request(1L);
    }
}
