package io.helidon.common.reactive;

import java.util.Iterator;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/helidon/common/reactive/MultiFlatMapPublisher.class */
public final class MultiFlatMapPublisher<T, R> implements Multi<R> {
    private final Multi<T> source;
    private final Function<? super T, ? extends Flow.Publisher<? extends R>> mapper;
    private final long maxConcurrency;
    private final long prefetch;
    private final boolean delayErrors;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/helidon/common/reactive/MultiFlatMapPublisher$FlatMapAggregateException.class */
    public static final class FlatMapAggregateException extends RuntimeException {
        FlatMapAggregateException() {
        }

        @Override // java.lang.Throwable
        public synchronized Throwable fillInStackTrace() {
            return this;
        }
    }

    /* loaded from: input_file:io/helidon/common/reactive/MultiFlatMapPublisher$FlatMapSubscriber.class */
    static final class FlatMapSubscriber<T, R> extends AtomicInteger implements Flow.Subscriber<T>, Flow.Subscription {
        private final Flow.Subscriber<? super R> downstream;
        private final Function<? super T, ? extends Flow.Publisher<? extends R>> mapper;
        private final long maxConcurrency;
        private final long prefetch;
        private final boolean delayErrors;
        private Flow.Subscription upstream;
        private volatile boolean upstreamDone;
        private volatile boolean canceled;
        private long emitted;
        private final AtomicReference<Throwable> errors = new AtomicReference<>();
        private final ConcurrentMap<InnerSubscriber<R>, Object> subscribers = new ConcurrentHashMap();
        private final AtomicReference<Queue<InnerSubscriber<R>>> queue = new AtomicReference<>();
        private final AtomicLong requested = new AtomicLong();

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:io/helidon/common/reactive/MultiFlatMapPublisher$FlatMapSubscriber$InnerSubscriber.class */
        public static final class InnerSubscriber<R> extends AtomicReference<Flow.Subscription> implements Flow.Subscriber<R>, Flow.Subscription {
            private final FlatMapSubscriber<?, R> parent;
            private final long prefetch;
            private final long limit;
            private long produced;
            private volatile boolean done;
            private volatile Queue<R> queue;

            InnerSubscriber(FlatMapSubscriber<?, R> flatMapSubscriber, long j) {
                this.parent = flatMapSubscriber;
                this.prefetch = j;
                this.limit = j - (j >> 2);
            }

            @Override // java.util.concurrent.Flow.Subscriber
            public void onSubscribe(Flow.Subscription subscription) {
                if (SubscriptionHelper.setOnce(this, subscription)) {
                    subscription.request(this.prefetch);
                }
            }

            @Override // java.util.concurrent.Flow.Subscriber
            public void onNext(R r) {
                this.parent.innerNext(r, this);
            }

            @Override // java.util.concurrent.Flow.Subscriber
            public void onError(Throwable th) {
                lazySet(this);
                this.parent.innerError(th, this);
            }

            @Override // java.util.concurrent.Flow.Subscriber
            public void onComplete() {
                lazySet(this);
                this.parent.innerComplete(this);
            }

            @Override // java.util.concurrent.Flow.Subscription
            public void request(long j) {
            }

            public void produced(long j) {
                long j2 = this.produced + j;
                if (j2 < this.limit) {
                    this.produced = j2;
                } else {
                    this.produced = 0L;
                    get().request(j2);
                }
            }

            @Override // java.util.concurrent.Flow.Subscription
            public void cancel() {
                Flow.Subscription andSet = getAndSet(this);
                if (andSet == null || andSet == this) {
                    return;
                }
                andSet.cancel();
            }

            public Queue<R> getQueue() {
                return this.queue;
            }

            public void enqueue(R r) {
                Queue<R> queue = this.queue;
                if (queue == null) {
                    queue = new ConcurrentLinkedQueue();
                    this.queue = queue;
                }
                queue.offer(r);
            }

            public void setDone() {
                this.done = true;
            }

            public boolean isDone() {
                return this.done;
            }

            @Override // java.util.concurrent.atomic.AtomicReference
            public String toString() {
                boolean z = this.done;
                Queue<R> queue = this.queue;
                return "InnerSubscriber{done=" + z + ", queue=" + (queue != null ? Integer.valueOf(queue.size()) : "null") + "}";
            }
        }

        FlatMapSubscriber(Flow.Subscriber<? super R> subscriber, Function<? super T, ? extends Flow.Publisher<? extends R>> function, long j, long j2, boolean z) {
            this.downstream = subscriber;
            this.mapper = function;
            this.maxConcurrency = j;
            this.prefetch = j2;
            this.delayErrors = z;
        }

        @Override // java.util.concurrent.Flow.Subscriber
        public void onSubscribe(Flow.Subscription subscription) {
            Objects.requireNonNull(subscription);
            if (this.upstream != null) {
                subscription.cancel();
                throw new IllegalStateException("Subscription already set");
            }
            this.upstream = subscription;
            this.downstream.onSubscribe(this);
            subscription.request(this.maxConcurrency);
        }

        @Override // java.util.concurrent.Flow.Subscriber
        public void onNext(T t) {
            if (this.upstreamDone) {
                return;
            }
            try {
                Flow.Publisher publisher = (Flow.Publisher) Objects.requireNonNull(this.mapper.apply(t), "The mapper returned a null Publisher");
                InnerSubscriber<R> innerSubscriber = new InnerSubscriber<>(this, this.prefetch);
                this.subscribers.put(innerSubscriber, innerSubscriber);
                if (this.canceled) {
                    this.subscribers.remove(innerSubscriber);
                } else {
                    publisher.subscribe(innerSubscriber);
                }
            } catch (Throwable th) {
                this.upstream.cancel();
                onError(th);
            }
        }

        @Override // java.util.concurrent.Flow.Subscriber
        public void onError(Throwable th) {
            if (this.upstreamDone) {
                return;
            }
            doError(th);
        }

        @Override // java.util.concurrent.Flow.Subscriber
        public void onComplete() {
            if (this.upstreamDone) {
                return;
            }
            this.upstreamDone = true;
            drain();
        }

        void doError(Throwable th) {
            if (this.delayErrors) {
                addError(th);
            } else {
                this.errors.compareAndSet(null, th);
                cancelInners();
            }
            this.upstreamDone = true;
            drain();
        }

        @Override // java.util.concurrent.Flow.Subscription
        public void request(long j) {
            if (j <= 0) {
                doError(new IllegalArgumentException("Rule §3.9 violated: non-positive request amount is forbidden"));
            } else {
                SubscriptionHelper.addRequest(this.requested, j);
                drain();
            }
        }

        @Override // java.util.concurrent.Flow.Subscription
        public void cancel() {
            this.canceled = true;
            this.upstream.cancel();
            cancelInners();
        }

        void cancelInners() {
            Iterator<InnerSubscriber<R>> it = this.subscribers.keySet().iterator();
            while (it.hasNext()) {
                it.next().cancel();
            }
            this.subscribers.clear();
        }

        public void innerNext(R r, InnerSubscriber<R> innerSubscriber) {
            if (get() == 0 && compareAndSet(0, 1)) {
                long j = this.requested.get();
                long j2 = this.emitted;
                if (j != j2) {
                    Queue<InnerSubscriber<R>> queue = this.queue.get();
                    if (queue != null && !queue.isEmpty()) {
                        innerSubscriber.enqueue(r);
                        queue.offer(innerSubscriber);
                        drainLoop();
                        return;
                    } else {
                        this.emitted = j2 + 1;
                        this.downstream.onNext(r);
                        innerSubscriber.produced(1L);
                    }
                } else {
                    innerSubscriber.enqueue(r);
                    getOrCreateQueue().offer(innerSubscriber);
                }
                if (decrementAndGet() == 0) {
                    return;
                }
            } else {
                innerSubscriber.enqueue(r);
                getOrCreateQueue().offer(innerSubscriber);
                if (getAndIncrement() != 0) {
                    return;
                }
            }
            drainLoop();
        }

        public void innerError(Throwable th, InnerSubscriber<R> innerSubscriber) {
            if (this.delayErrors) {
                addError(th);
                innerSubscriber.setDone();
            } else {
                this.errors.compareAndSet(null, th);
                this.upstream.cancel();
                cancelInners();
                innerSubscriber.setDone();
                this.upstreamDone = true;
            }
            getOrCreateQueue().offer(innerSubscriber);
            drain();
        }

        public void innerComplete(InnerSubscriber<R> innerSubscriber) {
            innerSubscriber.setDone();
            if (get() == 0 && compareAndSet(0, 1)) {
                Queue<R> queue = innerSubscriber.getQueue();
                if (queue == null || queue.isEmpty()) {
                    this.subscribers.remove(innerSubscriber);
                    boolean z = this.upstreamDone;
                    Queue<InnerSubscriber<R>> queue2 = this.queue.get();
                    boolean z2 = queue2 == null || queue2.isEmpty();
                    boolean isEmpty = this.subscribers.isEmpty();
                    if (z && z2 && isEmpty) {
                        Throwable th = this.errors.get();
                        if (th == null) {
                            this.downstream.onComplete();
                        } else {
                            this.downstream.onError(th);
                        }
                        this.canceled = true;
                    } else if (!z) {
                        this.upstream.request(1L);
                    }
                } else {
                    getOrCreateQueue().offer(innerSubscriber);
                }
                if (decrementAndGet() == 0) {
                    return;
                }
            } else {
                getOrCreateQueue().offer(innerSubscriber);
                if (getAndIncrement() != 0) {
                    return;
                }
            }
            drainLoop();
        }

        Queue<InnerSubscriber<R>> getOrCreateQueue() {
            Queue<InnerSubscriber<R>> queue = this.queue.get();
            if (queue == null) {
                queue = new ConcurrentLinkedQueue();
                if (!this.queue.compareAndSet(null, queue)) {
                    queue = this.queue.get();
                }
            }
            return queue;
        }

        void addError(Throwable th) {
            while (true) {
                Throwable th2 = this.errors.get();
                if (th2 == null) {
                    if (this.errors.compareAndSet(null, th)) {
                        return;
                    }
                } else {
                    if (th2 instanceof FlatMapAggregateException) {
                        th2.addSuppressed(th);
                        return;
                    }
                    FlatMapAggregateException flatMapAggregateException = new FlatMapAggregateException();
                    flatMapAggregateException.addSuppressed(th2);
                    flatMapAggregateException.addSuppressed(th);
                    if (this.errors.compareAndSet(th2, flatMapAggregateException)) {
                        return;
                    }
                }
            }
        }

        void drain() {
            if (getAndIncrement() == 0) {
                drainLoop();
            }
        }

        void drainLoop() {
            Throwable th;
            int i = 1;
            long j = this.requested.get();
            long j2 = this.emitted;
            Flow.Subscriber<? super R> subscriber = this.downstream;
            AtomicReference<Queue<InnerSubscriber<R>>> atomicReference = this.queue;
            ConcurrentMap<InnerSubscriber<R>, Object> concurrentMap = this.subscribers;
            while (true) {
                if (this.canceled) {
                    atomicReference.lazySet(null);
                    concurrentMap.clear();
                } else if (this.delayErrors || (th = this.errors.get()) == null) {
                    boolean z = this.upstreamDone;
                    boolean isEmpty = concurrentMap.isEmpty();
                    Queue<InnerSubscriber<R>> queue = atomicReference.get();
                    boolean z2 = queue == null || queue.isEmpty();
                    if (z && isEmpty && z2) {
                        this.canceled = true;
                        Throwable th2 = this.errors.get();
                        if (th2 != null) {
                            subscriber.onError(th2);
                        } else {
                            subscriber.onComplete();
                        }
                    } else if (!z2) {
                        InnerSubscriber<R> peek = queue.peek();
                        boolean isDone = peek.isDone();
                        Queue<R> queue2 = peek.getQueue();
                        boolean z3 = queue2 == null || queue2.isEmpty();
                        if (isDone && z3) {
                            concurrentMap.remove(peek);
                            queue.poll();
                            this.upstream.request(1L);
                        } else if (!z3 && j != j2) {
                            queue.poll();
                            j2++;
                            subscriber.onNext(queue2.poll());
                            peek.produced(1L);
                        }
                    }
                } else {
                    this.canceled = true;
                    subscriber.onError(th);
                }
                this.emitted = j2;
                i = addAndGet(-i);
                if (i == 0) {
                    return;
                } else {
                    j = this.requested.get();
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MultiFlatMapPublisher(Multi<T> multi, Function<? super T, ? extends Flow.Publisher<? extends R>> function, long j, long j2, boolean z) {
        this.source = multi;
        this.mapper = function;
        this.maxConcurrency = j;
        this.prefetch = j2;
        this.delayErrors = z;
    }

    @Override // java.util.concurrent.Flow.Publisher
    public void subscribe(Flow.Subscriber<? super R> subscriber) {
        Objects.requireNonNull(subscriber, "subscriber is null");
        this.source.subscribe(new FlatMapSubscriber(subscriber, this.mapper, this.maxConcurrency, this.prefetch, this.delayErrors));
    }
}
