package io.axoniq.axonserver.connector.impl.buffer;

import io.axoniq.axonserver.connector.impl.CloseableBuffer;
import io.axoniq.axonserver.grpc.ErrorMessage;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;

/* loaded from: input_file:io/axoniq/axonserver/connector/impl/buffer/BlockingCloseableBuffer.class */
public class BlockingCloseableBuffer<T> implements CloseableBuffer<T> {
    private static final int DEFAULT_CAPACITY = 32;
    private final BlockingQueue<T> buffer = new LinkedBlockingQueue(DEFAULT_CAPACITY);
    private volatile boolean closed = false;
    private final AtomicReference<ErrorMessage> errorRef = new AtomicReference<>();
    private final AtomicReference<Runnable> onAvailableRef = new AtomicReference<>();

    @Override // io.axoniq.axonserver.connector.impl.CloseableReadonlyBuffer
    public Optional<T> poll() {
        return Optional.ofNullable(this.buffer.poll());
    }

    @Override // io.axoniq.axonserver.connector.impl.CloseableReadonlyBuffer
    public boolean isEmpty() {
        return this.buffer.isEmpty();
    }

    @Override // io.axoniq.axonserver.connector.impl.CloseableReadonlyBuffer
    public int capacity() {
        return DEFAULT_CAPACITY;
    }

    public int size() {
        return this.buffer.size();
    }

    @Override // io.axoniq.axonserver.connector.impl.CloseableReadonlyBuffer
    public void onAvailable(Runnable runnable) {
        this.onAvailableRef.set(runnable);
        if (!isEmpty() || this.closed) {
            notifyOnAvailable();
        }
    }

    @Override // io.axoniq.axonserver.connector.impl.CloseableBuffer
    public void put(T t) {
        try {
            this.buffer.put(t);
            notifyOnAvailable();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    @Override // io.axoniq.axonserver.connector.impl.CloseableReadonlyBuffer
    public boolean closed() {
        return this.closed;
    }

    @Override // io.axoniq.axonserver.connector.impl.CloseableReadonlyBuffer
    public Optional<ErrorMessage> error() {
        return Optional.ofNullable(this.errorRef.get());
    }

    @Override // io.axoniq.axonserver.connector.impl.CloseableBuffer
    public void close() {
        this.closed = true;
        notifyOnAvailable();
    }

    @Override // io.axoniq.axonserver.connector.impl.CloseableBuffer
    public void closeExceptionally(ErrorMessage errorMessage) {
        this.errorRef.set(errorMessage);
        close();
    }

    protected void notifyOnAvailable() {
        Runnable runnable = this.onAvailableRef.get();
        if (runnable != null) {
            runnable.run();
        }
    }
}
