package karate.com.linecorp.armeria.internal.common.stream;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Function;
import karate.com.linecorp.armeria.common.ResponseHeaders;
import karate.com.linecorp.armeria.common.annotation.Nullable;
import karate.com.linecorp.armeria.common.stream.AbortedStreamException;
import karate.com.linecorp.armeria.common.stream.CancelledSubscriptionException;
import karate.com.linecorp.armeria.common.stream.NoopSubscriber;
import karate.com.linecorp.armeria.common.stream.StreamMessage;
import karate.com.linecorp.armeria.common.stream.SubscriptionOption;
import karate.com.linecorp.armeria.common.util.CompositeException;
import karate.com.linecorp.armeria.common.util.EventLoopCheckingFuture;
import karate.com.linecorp.armeria.common.util.Exceptions;
import karate.com.linecorp.armeria.common.util.UnmodifiableFuture;
import karate.io.netty.util.concurrent.EventExecutor;
import karate.org.reactivestreams.Subscriber;
import karate.org.reactivestreams.Subscription;

/* loaded from: input_file:karate/com/linecorp/armeria/internal/common/stream/RecoverableStreamMessage.class */
public final class RecoverableStreamMessage<T> implements StreamMessage<T> {
    private static final AtomicIntegerFieldUpdater<RecoverableStreamMessage> subscribedUpdater = AtomicIntegerFieldUpdater.newUpdater(RecoverableStreamMessage.class, "subscribed");
    private final CompletableFuture<Void> completionFuture = new EventLoopCheckingFuture();
    private final StreamMessage<T> upstream;
    private final Function<Throwable, StreamMessage<T>> errorFunction;
    private final boolean allowResuming;

    @Nullable
    private volatile StreamMessage<T> fallbackStream;

    @Nullable
    private volatile EventExecutor executor;
    private volatile int subscribed;

    /* loaded from: input_file:karate/com/linecorp/armeria/internal/common/stream/RecoverableStreamMessage$RecoverableSubscriber.class */
    private final class RecoverableSubscriber extends SubscriptionArbiter implements Subscriber<T> {
        private Subscriber<T> downstream;
        private final EventExecutor executor;
        private final SubscriptionOption[] options;
        private boolean errorHandled;
        private boolean wroteAny;
        private boolean complete;

        /* JADX WARN: Multi-variable type inference failed */
        private RecoverableSubscriber(Subscriber<? super T> subscriber, EventExecutor eventExecutor, SubscriptionOption[] subscriptionOptionArr) {
            super(eventExecutor);
            this.downstream = subscriber;
            this.executor = eventExecutor;
            this.options = subscriptionOptionArr;
        }

        @Override // karate.org.reactivestreams.Subscriber
        public void onSubscribe(Subscription subscription) {
            setUpstreamSubscription(subscription);
            if (this.errorHandled) {
                return;
            }
            this.downstream.onSubscribe(this);
        }

        @Override // karate.org.reactivestreams.Subscriber
        public void onNext(T t) {
            if (!isTransient(t)) {
                this.wroteAny = true;
            }
            produced(1L);
            this.downstream.onNext(t);
        }

        private boolean isTransient(T t) {
            return (t instanceof ResponseHeaders) && ((ResponseHeaders) t).status().isInformational();
        }

        @Override // karate.org.reactivestreams.Subscriber
        public void onError(Throwable th) {
            if (this.complete) {
                return;
            }
            if ((th instanceof AbortedStreamException) || (th instanceof CancelledSubscriptionException)) {
                onError0(th);
                return;
            }
            if (this.errorHandled) {
                onError0(th);
                return;
            }
            if (!(RecoverableStreamMessage.this.allowResuming || !this.wroteAny)) {
                onError0(th);
                return;
            }
            this.errorHandled = true;
            try {
                StreamMessage streamMessage = (StreamMessage) RecoverableStreamMessage.this.errorFunction.apply(th);
                Objects.requireNonNull(streamMessage, "errorFunction.apply() returned null");
                RecoverableStreamMessage.this.fallbackStream = streamMessage;
                streamMessage.subscribe(this, this.executor, this.options);
            } catch (Throwable th2) {
                onError0(new CompositeException(th2, th));
            }
        }

        private void onError0(Throwable th) {
            this.complete = true;
            this.downstream.onError(th);
            RecoverableStreamMessage.this.completionFuture.completeExceptionally(th);
        }

        @Override // karate.org.reactivestreams.Subscriber
        public void onComplete() {
            if (this.complete) {
                return;
            }
            this.complete = true;
            this.downstream.onComplete();
            RecoverableStreamMessage.this.completionFuture.complete(null);
        }

        @Override // karate.com.linecorp.armeria.internal.common.stream.SubscriptionArbiter, karate.org.reactivestreams.Subscription
        public void cancel() {
            if (this.executor.inEventLoop()) {
                cancel0();
            } else {
                this.executor.execute(this::cancel0);
            }
        }

        private void cancel0() {
            if (this.complete) {
                return;
            }
            this.complete = true;
            doCancel();
            CancelledSubscriptionException cancelledSubscriptionException = CancelledSubscriptionException.get();
            if (InternalStreamMessageUtil.containsNotifyCancellation(this.options)) {
                this.downstream.onError(cancelledSubscriptionException);
            }
            this.downstream = NoopSubscriber.get();
            RecoverableStreamMessage.this.completionFuture.completeExceptionally(cancelledSubscriptionException);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public RecoverableStreamMessage(StreamMessage<T> streamMessage, Function<? super Throwable, ? extends StreamMessage<T>> function, boolean z) {
        this.upstream = streamMessage;
        this.errorFunction = function;
        this.allowResuming = z;
    }

    @Override // karate.com.linecorp.armeria.common.stream.StreamMessage
    public boolean isOpen() {
        StreamMessage<T> streamMessage = this.fallbackStream;
        return streamMessage != null ? streamMessage.isOpen() : this.upstream.isOpen();
    }

    @Override // karate.com.linecorp.armeria.common.stream.StreamMessage
    public boolean isEmpty() {
        if (!this.upstream.isEmpty()) {
            return false;
        }
        StreamMessage<T> streamMessage = this.fallbackStream;
        if (streamMessage == null) {
            return true;
        }
        return streamMessage.isEmpty();
    }

    @Override // karate.com.linecorp.armeria.common.stream.StreamMessage
    public long demand() {
        StreamMessage<T> streamMessage = this.fallbackStream;
        return streamMessage != null ? streamMessage.demand() : this.upstream.demand();
    }

    @Override // karate.com.linecorp.armeria.common.stream.StreamMessage
    public CompletableFuture<Void> whenComplete() {
        return this.completionFuture;
    }

    @Override // karate.com.linecorp.armeria.common.stream.StreamMessage
    public void subscribe(Subscriber<? super T> subscriber, EventExecutor eventExecutor, SubscriptionOption... subscriptionOptionArr) {
        Objects.requireNonNull(subscriber, "subscriber");
        Objects.requireNonNull(eventExecutor, "executor");
        Objects.requireNonNull(subscriptionOptionArr, "options");
        if (subscribedUpdater.compareAndSet(this, 0, 1)) {
            this.executor = eventExecutor;
            this.upstream.subscribe(new RecoverableSubscriber(subscriber, eventExecutor, subscriptionOptionArr), eventExecutor, subscriptionOptionArr);
        } else if (eventExecutor.inEventLoop()) {
            abortLateSubscriber(subscriber);
        } else {
            eventExecutor.execute(() -> {
                abortLateSubscriber(subscriber);
            });
        }
    }

    private void abortLateSubscriber(Subscriber<? super T> subscriber) {
        subscriber.onSubscribe(NoopSubscription.get());
        subscriber.onError(new IllegalStateException("subscribed by other subscriber already"));
    }

    @Override // karate.com.linecorp.armeria.common.stream.StreamMessage
    public void abort() {
        abort(AbortedStreamException.get());
    }

    @Override // karate.com.linecorp.armeria.common.stream.StreamMessage
    public void abort(Throwable th) {
        Objects.requireNonNull(th, "cause");
        EventExecutor eventExecutor = this.executor;
        if (eventExecutor == null || eventExecutor.inEventLoop()) {
            abort0(th);
        } else {
            eventExecutor.execute(() -> {
                abort0(th);
            });
        }
    }

    @Override // karate.com.linecorp.armeria.common.stream.StreamMessage
    public CompletableFuture<List<T>> collect(EventExecutor eventExecutor, SubscriptionOption... subscriptionOptionArr) {
        return this.allowResuming ? super.collect(eventExecutor, subscriptionOptionArr) : !subscribedUpdater.compareAndSet(this, 0, 1) ? UnmodifiableFuture.exceptionallyCompletedFuture((Throwable) new IllegalStateException("subscribed by other subscriber already")) : this.upstream.collect(eventExecutor, subscriptionOptionArr).handle((list, th) -> {
            if (th == null) {
                this.completionFuture.complete(null);
                return UnmodifiableFuture.completedFuture(list);
            }
            StreamMessage<T> apply = this.errorFunction.apply(Exceptions.peel(th));
            Objects.requireNonNull(apply, "errorFunction.apply() returned null");
            return apply.collect(eventExecutor, subscriptionOptionArr).handle((list, th) -> {
                if (th == null) {
                    this.completionFuture.complete(null);
                    return list;
                }
                Throwable peel = Exceptions.peel(th);
                this.completionFuture.completeExceptionally(peel);
                return (List) Exceptions.throwUnsafely(peel);
            });
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) Function.identity());
    }

    private void abort0(Throwable th) {
        StreamMessage<T> streamMessage = this.fallbackStream;
        if (streamMessage != null) {
            streamMessage.abort(th);
        } else {
            this.upstream.abort(th);
        }
    }
}
