/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.servicebus;

import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.MessagePump;
import com.azure.messaging.servicebus.MessagePumpTerminatedException;
import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusErrorContext;
import com.azure.messaging.servicebus.ServiceBusReceivedMessageContext;
import com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient;
import com.azure.messaging.servicebus.SessionsMessagePump;
import com.azure.messaging.servicebus.implementation.MessageUtils;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;

final class ServiceBusProcessor {
    private final Object lock = new Object();
    private final Kind kind;
    private final ServiceBusClientBuilder.ServiceBusReceiverClientBuilder nonSessionBuilder;
    private final ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder sessionBuilder;
    private final Consumer<ServiceBusReceivedMessageContext> processMessage;
    private final Consumer<ServiceBusErrorContext> processError;
    private final int concurrency;
    private final Boolean enableAutoDisposition;
    private boolean isRunning;
    private RollingMessagePump rollingMessagePump;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    ServiceBusProcessor(ServiceBusClientBuilder.ServiceBusReceiverClientBuilder builder, Consumer<ServiceBusReceivedMessageContext> processMessage, Consumer<ServiceBusErrorContext> processError, int concurrency, boolean enableAutoDisposition) {
        this.kind = Kind.NON_SESSION;
        this.nonSessionBuilder = builder;
        this.sessionBuilder = null;
        this.processError = processError;
        this.processMessage = processMessage;
        this.concurrency = concurrency;
        this.enableAutoDisposition = enableAutoDisposition;
        Object object = this.lock;
        synchronized (object) {
            this.isRunning = false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    ServiceBusProcessor(ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder builder, Consumer<ServiceBusReceivedMessageContext> processMessage, Consumer<ServiceBusErrorContext> processError, int concurrency) {
        this.kind = Kind.SESSION;
        this.sessionBuilder = builder;
        this.nonSessionBuilder = null;
        this.processError = processError;
        this.processMessage = processMessage;
        this.concurrency = concurrency;
        this.enableAutoDisposition = null;
        Object object = this.lock;
        synchronized (object) {
            this.isRunning = false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void start() {
        RollingMessagePump p;
        Object object = this.lock;
        synchronized (object) {
            if (this.isRunning) {
                return;
            }
            this.isRunning = true;
            this.rollingMessagePump = this.kind == Kind.NON_SESSION ? new RollingMessagePump(this.nonSessionBuilder, this.processMessage, this.processError, this.concurrency, this.enableAutoDisposition) : new RollingMessagePump(this.sessionBuilder, this.processMessage, this.processError, this.concurrency);
            p = this.rollingMessagePump;
        }
        p.begin();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean isRunning() {
        Object object = this.lock;
        synchronized (object) {
            return this.isRunning;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void close() {
        RollingMessagePump p;
        Object object = this.lock;
        synchronized (object) {
            if (!this.isRunning) {
                return;
            }
            this.isRunning = false;
            p = this.rollingMessagePump;
        }
        p.dispose();
    }

    void stop() {
        this.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    String getIdentifier() {
        RollingMessagePump p;
        Object object = this.lock;
        synchronized (object) {
            p = this.rollingMessagePump;
        }
        return p == null ? null : p.getClientIdentifier();
    }

    static enum Kind {
        NON_SESSION,
        SESSION;

    }

    static final class RollingMessagePump
    extends AtomicBoolean {
        private static final RuntimeException DISPOSED_ERROR = new RuntimeException("The Processor closure disposed the RollingMessagePump.");
        private static final Duration NEXT_PUMP_BACKOFF = Duration.ofSeconds(5L);
        private final ClientLogger logger;
        private final Kind kind;
        private final ServiceBusClientBuilder.ServiceBusReceiverClientBuilder nonSessionBuilder;
        private final ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder sessionBuilder;
        private final int concurrency;
        private final Consumer<ServiceBusReceivedMessageContext> processMessage;
        private final Consumer<ServiceBusErrorContext> processError;
        private final Boolean enableAutoDisposition;
        private final Disposable.Composite disposable = Disposables.composite();
        private final AtomicReference<String> clientIdentifier = new AtomicReference();

        RollingMessagePump(ServiceBusClientBuilder.ServiceBusReceiverClientBuilder builder, Consumer<ServiceBusReceivedMessageContext> processMessage, Consumer<ServiceBusErrorContext> processError, int concurrency, boolean enableAutoDisposition) {
            this.logger = new ClientLogger(RollingMessagePump.class);
            this.kind = Kind.NON_SESSION;
            this.nonSessionBuilder = builder;
            this.sessionBuilder = null;
            this.concurrency = concurrency;
            this.processError = processError;
            this.processMessage = processMessage;
            this.enableAutoDisposition = enableAutoDisposition;
        }

        RollingMessagePump(ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder builder, Consumer<ServiceBusReceivedMessageContext> processMessage, Consumer<ServiceBusErrorContext> processError, int concurrencyPerSession) {
            this.logger = new ClientLogger(RollingMessagePump.class);
            this.kind = Kind.SESSION;
            this.sessionBuilder = builder;
            this.nonSessionBuilder = null;
            this.processError = processError;
            this.processMessage = processMessage;
            this.concurrency = concurrencyPerSession;
            this.enableAutoDisposition = null;
        }

        void begin() {
            if (this.getAndSet(true)) {
                throw this.logger.atInfo().log((RuntimeException)new IllegalStateException("The streaming cannot begin more than once."));
            }
            Disposable d = MessageUtils.subscribe(this.beginIntern(), "begin", this.logger.atWarning());
            if (!this.disposable.add(d)) {
                throw this.logger.atInfo().log((RuntimeException)new IllegalStateException("Cannot begin streaming after the disposal."));
            }
        }

        Mono<Void> beginIntern() {
            Mono pumping = this.kind == Kind.NON_SESSION ? Mono.using(() -> this.nonSessionBuilder.buildAsyncClientForProcessor(), client -> {
                this.clientIdentifier.set(client.getIdentifier());
                MessagePump pump = new MessagePump((ServiceBusReceiverAsyncClient)client, this.processMessage, this.processError, this.concurrency, this.enableAutoDisposition);
                return pump.begin();
            }, client -> client.close(), (boolean)true) : Mono.using(() -> {
                SessionsMessagePump pump = this.sessionBuilder.buildPumpForProcessor(this.logger, this.processMessage, this.processError, this.concurrency);
                return pump;
            }, pump -> {
                this.clientIdentifier.set(pump.getIdentifier());
                return pump.begin();
            }, pump -> {}, (boolean)true);
            Mono rollingPump = pumping.onErrorResume(MessagePumpTerminatedException.class, t -> this.notifyError((MessagePumpTerminatedException)t).then(Mono.error((Throwable)t))).retryWhen(this.retrySpecForNextPump());
            return rollingPump;
        }

        String getClientIdentifier() {
            return this.clientIdentifier.get();
        }

        void dispose() {
            this.disposable.dispose();
        }

        private Mono<Void> notifyError(MessagePumpTerminatedException t) {
            ServiceBusErrorContext errorContext = t.getErrorContext();
            if (errorContext == null) {
                return Mono.empty();
            }
            return Mono.fromRunnable(() -> {
                try {
                    this.processError.accept(errorContext);
                }
                catch (Exception e) {
                    this.logger.atVerbose().log("Ignoring error from user processError handler.", new Object[]{e});
                }
            }).subscribeOn(Schedulers.boundedElastic());
        }

        private Retry retrySpecForNextPump() {
            return Retry.from(retrySignals -> retrySignals.concatMap(retrySignal -> {
                Retry.RetrySignal signal = retrySignal.copy();
                Throwable error = signal.failure();
                if (error == null) {
                    return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new IllegalStateException("RetrySignal::failure() not expected to be null."));
                }
                if (!(error instanceof MessagePumpTerminatedException)) {
                    return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new IllegalStateException("RetrySignal::failure() expected to be MessagePumpTerminatedException.", error));
                }
                MessagePumpTerminatedException e = (MessagePumpTerminatedException)error;
                if (this.disposable.isDisposed()) {
                    e.log(this.logger, "The Processor closure disposed the streaming, canceling retry for the next MessagePump.", true);
                    return Mono.error((Throwable)DISPOSED_ERROR);
                }
                e.log(this.logger, "The current MessagePump is terminated, scheduling retry for the next pump.", true);
                return Mono.delay((Duration)NEXT_PUMP_BACKOFF, (Scheduler)Schedulers.boundedElastic()).handle((v, sink) -> {
                    if (this.disposable.isDisposed()) {
                        e.log(this.logger, "During backoff, The Processor closure disposed the streaming, canceling retry for the next MessagePump.", false);
                        sink.error((Throwable)DISPOSED_ERROR);
                    } else {
                        e.log(this.logger, "Retrying for the next MessagePump.", false);
                        sink.next(v);
                    }
                });
            }));
        }
    }
}

