package org.bsc.async;

import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Consumer;
import org.bsc.async.AsyncGenerator;

/* loaded from: input_file:org/bsc/async/AsyncGeneratorQueue.class */
public class AsyncGeneratorQueue {

    /* loaded from: input_file:org/bsc/async/AsyncGeneratorQueue$Generator.class */
    public static class Generator<E> implements AsyncGenerator<E> {
        AsyncGenerator.Data<E> isEnd = null;
        final BlockingQueue<AsyncGenerator.Data<E>> queue;

        public Generator(BlockingQueue<AsyncGenerator.Data<E>> blockingQueue) {
            this.queue = blockingQueue;
        }

        public BlockingQueue<AsyncGenerator.Data<E>> queue() {
            return this.queue;
        }

        @Override // org.bsc.async.AsyncGenerator, org.bsc.async.AsyncGeneratorOperators
        public AsyncGenerator.Data<E> next() {
            while (this.isEnd == null) {
                AsyncGenerator.Data<E> poll = this.queue.poll();
                if (poll != null) {
                    if (poll.isDone()) {
                        this.isEnd = poll;
                    }
                    return poll;
                }
            }
            return this.isEnd;
        }
    }

    public static <E, Q extends BlockingQueue<AsyncGenerator.Data<E>>> AsyncGenerator<E> of(Q q, Consumer<Q> consumer) {
        return of(q, consumer, ForkJoinPool.commonPool());
    }

    public static <E, Q extends BlockingQueue<AsyncGenerator.Data<E>>> AsyncGenerator<E> of(Q q, Consumer<Q> consumer, Executor executor) {
        Objects.requireNonNull(q);
        Objects.requireNonNull(executor);
        Objects.requireNonNull(consumer);
        executor.execute(() -> {
            try {
                try {
                    consumer.accept(q);
                    q.add(AsyncGenerator.Data.done());
                } catch (Throwable th) {
                    CompletableFuture completableFuture = new CompletableFuture();
                    completableFuture.completeExceptionally(th);
                    q.add(AsyncGenerator.Data.of(completableFuture));
                    q.add(AsyncGenerator.Data.done());
                }
            } catch (Throwable th2) {
                q.add(AsyncGenerator.Data.done());
                throw th2;
            }
        });
        return new Generator(q);
    }

    @Deprecated
    public static <E, Q extends BlockingQueue<AsyncGenerator.Data<E>>> AsyncGenerator<E> of(Q q, Executor executor, Consumer<Q> consumer) {
        return of(q, consumer, executor);
    }
}
