package io.axoniq.axonserver.connector.impl;

import io.axoniq.axonserver.connector.ResultStream;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

/* loaded from: input_file:io/axoniq/axonserver/connector/impl/AbstractBufferedStream.class */
public abstract class AbstractBufferedStream<T, R> extends FlowControlledBuffer<T, R> implements ResultStream<T> {
    private static final Runnable NO_OP = () -> {
    };
    private final AtomicReference<Runnable> onAvailableCallback;
    private final Queue<Runnable> closeHandlers;
    private final AtomicBoolean closeRequested;
    private final AtomicBoolean clientClosed;

    public AbstractBufferedStream(String str, int i, int i2) {
        super(str, i, i2);
        this.onAvailableCallback = new AtomicReference<>(NO_OP);
        this.closeHandlers = new ConcurrentLinkedQueue();
        this.closeRequested = new AtomicBoolean();
        this.clientClosed = new AtomicBoolean();
    }

    @Override // io.axoniq.axonserver.connector.ResultStream
    public T next() throws InterruptedException {
        return take();
    }

    @Override // io.axoniq.axonserver.connector.ResultStream
    public T nextIfAvailable() {
        return tryTakeNow();
    }

    @Override // io.axoniq.axonserver.connector.ResultStream
    public T nextIfAvailable(long j, TimeUnit timeUnit) throws InterruptedException {
        return tryTake(j, timeUnit);
    }

    @Override // io.axoniq.axonserver.connector.impl.FlowControlledBuffer
    public void onNext(T t) {
        super.onNext(t);
        this.onAvailableCallback.get().run();
    }

    @Override // io.axoniq.axonserver.connector.impl.FlowControlledBuffer
    public void onError(Throwable th) {
        super.onError(th);
        this.onAvailableCallback.get().run();
        this.closeRequested.set(true);
        invokeCloseRequestHandlers();
    }

    @Override // io.axoniq.axonserver.connector.ResultStream
    public Optional<Throwable> getError() {
        return Optional.ofNullable(super.getErrorResult());
    }

    @Override // io.axoniq.axonserver.connector.impl.FlowControlledBuffer
    public void onCompleted() {
        super.onCompleted();
        this.onAvailableCallback.get().run();
        this.closeRequested.set(true);
        invokeCloseRequestHandlers();
    }

    @Override // io.axoniq.axonserver.connector.impl.FlowControlledBuffer, io.axoniq.axonserver.connector.ResultStream
    public T peek() {
        return (T) super.peek();
    }

    public void onAvailable(Runnable runnable) {
        if (runnable == null) {
            this.onAvailableCallback.set(NO_OP);
            return;
        }
        this.onAvailableCallback.set(runnable);
        if (isClosed() || peek() != null) {
            runnable.run();
        }
    }

    @Override // io.axoniq.axonserver.connector.impl.FlowControlledBuffer, io.axoniq.axonserver.connector.ResultStream
    public boolean isClosed() {
        return super.isClosed();
    }

    @Override // io.axoniq.axonserver.connector.impl.FlowControlledBuffer, io.axoniq.axonserver.connector.ResultStream, java.lang.AutoCloseable
    public void close() {
        if (this.clientClosed.getAndSet(true)) {
            return;
        }
        super.close();
        ObjectUtils.doIfNotNull(outboundStream(), clientCallStreamObserver -> {
            clientCallStreamObserver.cancel("Cancelling Stream", (Throwable) null);
        });
    }

    public void onCloseRequested(Runnable runnable) {
        this.closeHandlers.add(runnable);
        if (this.closeRequested.get()) {
            invokeCloseRequestHandlers();
        }
    }

    private void invokeCloseRequestHandlers() {
        Runnable poll;
        do {
            poll = this.closeHandlers.poll();
            ObjectUtils.doIfNotNull(poll, (v0) -> {
                v0.run();
            });
        } while (poll != null);
    }
}
