package io.confluent.ksql.reactive;

import io.vertx.core.Context;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Objects;
import java.util.Queue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/reactive/BufferedPublisher.class */
public class BufferedPublisher<T> extends BasePublisher<T> {
    private static final Logger log = LoggerFactory.getLogger(BufferedPublisher.class);
    public static final int SEND_MAX_BATCH_SIZE = 200;
    public static final int DEFAULT_BUFFER_MAX_SIZE = 200;
    private final Queue<T> buffer;
    private final int bufferMaxSize;
    private Runnable drainHandler;
    private boolean shouldSendComplete;
    private boolean complete;

    public BufferedPublisher(Context context) {
        this(context, 200);
    }

    public BufferedPublisher(Context context, Collection<T> collection) {
        this(context);
        this.buffer.addAll(collection);
        this.complete = true;
        this.shouldSendComplete = true;
    }

    public BufferedPublisher(Context context, int i) {
        super(context);
        this.buffer = new ArrayDeque();
        this.bufferMaxSize = i;
    }

    public boolean accept(T t) {
        checkContext();
        if (isComplete() || hasSentComplete()) {
            throw new IllegalStateException("Cannot call accept after complete is called");
        }
        if (!isCancelled() && !isFailed()) {
            if (getDemand() == 0) {
                this.buffer.add(t);
            } else {
                doOnNext(t);
            }
        }
        return this.buffer.size() >= this.bufferMaxSize;
    }

    public void drainHandler(Runnable runnable) {
        checkContext();
        if (this.drainHandler != null) {
            throw new IllegalStateException("drainHandler already set");
        }
        this.drainHandler = (Runnable) Objects.requireNonNull(runnable);
    }

    public void complete() {
        checkContext();
        if (isComplete() || isFailed()) {
            return;
        }
        this.complete = true;
        if (!this.buffer.isEmpty() || getSubscriber() == null) {
            this.shouldSendComplete = true;
        } else {
            sendComplete();
        }
    }

    protected boolean isComplete() {
        return this.complete;
    }

    @Override // io.confluent.ksql.reactive.BasePublisher
    protected void maybeSend() {
        int i = 0;
        while (true) {
            if (getDemand() <= 0 || this.buffer.isEmpty()) {
                break;
            }
            if (i >= 200) {
                this.ctx.runOnContext(r3 -> {
                    maybeSend();
                });
                break;
            } else {
                doOnNext(this.buffer.poll());
                i++;
            }
        }
        if (!this.buffer.isEmpty() || isFailed()) {
            return;
        }
        if (this.shouldSendComplete) {
            sendComplete();
            this.shouldSendComplete = false;
        } else {
            if (getDemand() <= 0 || this.drainHandler == null) {
                return;
            }
            Runnable runnable = this.drainHandler;
            this.ctx.runOnContext(r32 -> {
                runnable.run();
            });
            this.drainHandler = null;
        }
    }
}
