/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core;

import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.error.RequestCanceledException;
import com.couchbase.client.core.msg.CancellationReason;
import com.couchbase.client.core.msg.Request;
import com.couchbase.client.core.msg.RequestContext;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.function.Supplier;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;
import reactor.util.context.Context;

public class Reactor {
    public static final Duration DEFAULT_EMIT_BUSY_DURATION = Duration.ofSeconds(1L);

    private Reactor() {
        throw new AssertionError((Object)"not instantiable");
    }

    public static <T> Mono<T> wrap(Request<?> request, CompletableFuture<T> response, boolean propagateCancellation) {
        Mono mono = MyLittleAssemblyFactory.callOnAssembly(new SilentMonoCompletionStage<T>(response));
        if (propagateCancellation) {
            mono = mono.doFinally(st -> {
                if (st == SignalType.CANCEL) {
                    request.cancel(CancellationReason.STOPPED_LISTENING);
                }
            });
        }
        return mono.onErrorResume(err -> {
            if (err instanceof CompletionException) {
                return Mono.error((Throwable)err.getCause());
            }
            return Mono.error((Throwable)err);
        });
    }

    @Stability.Internal
    public static <T> Mono<T> wrap(CompletableFuture<T> response, Runnable cancellationTask) {
        Mono mono = MyLittleAssemblyFactory.callOnAssembly(new SilentMonoCompletionStage<T>(response));
        mono = mono.doFinally(st -> {
            if (st == SignalType.CANCEL) {
                cancellationTask.run();
            }
        });
        return mono.onErrorResume(err -> {
            if (err instanceof CompletionException) {
                return Mono.error((Throwable)err.getCause());
            }
            return Mono.error((Throwable)err);
        });
    }

    public static <T> Mono<T> toMono(Supplier<CompletableFuture<T>> input) {
        return Mono.fromFuture(input).onErrorMap(t -> t instanceof CompletionException ? t.getCause() : t);
    }

    public static <T, C extends Iterable<T>> Flux<T> toFlux(Supplier<CompletableFuture<C>> input) {
        return Reactor.toMono(input).flux().flatMap(Flux::fromIterable);
    }

    public static Sinks.EmitFailureHandler emitFailureHandler() {
        return Reactor.emitFailureHandler(DEFAULT_EMIT_BUSY_DURATION);
    }

    public static Sinks.EmitFailureHandler emitFailureHandler(Duration duration) {
        return Sinks.EmitFailureHandler.busyLooping((Duration)duration);
    }

    public static <T> Flux<T> shieldFromCancellation(Flux<T> flux) {
        return Flux.defer(() -> {
            Sinks.Many sink = Sinks.many().unicast().onBackpressureBuffer();
            flux.subscribeOn(Schedulers.boundedElastic()).subscribe(next -> Reactor.ignoreIfDone(sink.tryEmitNext(next)).orThrow(), error -> Reactor.ignoreIfDone(sink.tryEmitError(error)).orThrowWithCause(error), () -> Reactor.ignoreIfDone(sink.tryEmitComplete()).orThrow());
            return sink.asFlux();
        });
    }

    private static Sinks.EmitResult ignoreIfDone(Sinks.EmitResult result) {
        return result == Sinks.EmitResult.FAIL_TERMINATED || result == Sinks.EmitResult.FAIL_CANCELLED ? Sinks.EmitResult.OK : result;
    }

    private static final class SilentMonoCompletionStage<T>
    extends Mono<T>
    implements Fuseable,
    Scannable {
        final CompletionStage<? extends T> future;

        SilentMonoCompletionStage(CompletionStage<? extends T> future) {
            this.future = Objects.requireNonNull(future, "future");
        }

        public void subscribe(CoreSubscriber<? super T> actual) {
            Operators.MonoSubscriber sds = new Operators.MonoSubscriber(actual);
            actual.onSubscribe((Subscription)sds);
            if (sds.isCancelled()) {
                return;
            }
            this.future.whenComplete((v, e) -> {
                if (sds.isCancelled()) {
                    Context ctx = sds.currentContext();
                    if (e == null || e instanceof CancellationException) {
                        Operators.onDiscard((Object)v, (Context)ctx);
                    } else {
                        if (e instanceof CompletionException && e.getCause() instanceof RequestCanceledException) {
                            RequestContext requestContext = ((RequestCanceledException)e.getCause()).context().requestContext();
                            if (requestContext.request().cancellationReason() != CancellationReason.STOPPED_LISTENING) {
                                Operators.onErrorDropped((Throwable)e, (Context)ctx);
                            }
                        } else if (e instanceof RequestCanceledException) {
                            RequestContext requestContext = ((RequestCanceledException)e).context().requestContext();
                            if (requestContext.request().cancellationReason() != CancellationReason.STOPPED_LISTENING) {
                                Operators.onErrorDropped((Throwable)e, (Context)ctx);
                            }
                        } else {
                            Operators.onErrorDropped((Throwable)e, (Context)ctx);
                        }
                        Operators.onDiscard((Object)v, (Context)ctx);
                    }
                    return;
                }
                try {
                    if (e instanceof CompletionException) {
                        actual.onError(e.getCause());
                    } else if (e != null) {
                        actual.onError(e);
                    } else if (v != null) {
                        sds.complete(v);
                    } else {
                        actual.onComplete();
                    }
                }
                catch (Throwable e1) {
                    Operators.onErrorDropped((Throwable)e1, (Context)actual.currentContext());
                    throw Exceptions.bubble((Throwable)e1);
                }
            });
        }

        public Object scanUnsafe(Scannable.Attr key) {
            return null;
        }
    }

    private static abstract class MyLittleAssemblyFactory<T>
    extends Mono<T> {
        private MyLittleAssemblyFactory() {
        }

        static <T> Mono<T> callOnAssembly(Mono<T> source) {
            return MyLittleAssemblyFactory.onAssembly(source);
        }
    }
}

