/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.spring.pubsub.reactive;

import com.google.api.gax.rpc.DeadlineExceededException;
import com.google.cloud.spring.pubsub.core.subscriber.PubSubSubscriberOperations;
import com.google.cloud.spring.pubsub.support.AcknowledgeablePubsubMessage;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.util.Assert;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

public final class PubSubReactiveFactory {
    private static final Log LOGGER = LogFactory.getLog(PubSubReactiveFactory.class);
    private final PubSubSubscriberOperations subscriberOperations;
    private final Scheduler scheduler;
    private final int maxMessages;

    public PubSubReactiveFactory(PubSubSubscriberOperations subscriberOperations, Scheduler scheduler) {
        this(subscriberOperations, scheduler, Integer.MAX_VALUE);
    }

    public PubSubReactiveFactory(PubSubSubscriberOperations subscriberOperations, Scheduler scheduler, int maxMessages) {
        Assert.notNull((Object)subscriberOperations, (String)"subscriberOperations cannot be null.");
        Assert.notNull((Object)scheduler, (String)"scheduler cannot be null.");
        if (maxMessages < 1) {
            throw new IllegalArgumentException("maxMessages cannot be less than 1.");
        }
        this.subscriberOperations = subscriberOperations;
        this.scheduler = scheduler;
        this.maxMessages = maxMessages;
    }

    public Flux<AcknowledgeablePubsubMessage> poll(String subscriptionName, long pollingPeriodMs) {
        return Flux.create(sink -> sink.onRequest(numRequested -> {
            if (numRequested == Long.MAX_VALUE) {
                this.pollingPull(subscriptionName, pollingPeriodMs, (FluxSink<AcknowledgeablePubsubMessage>)sink);
            } else {
                this.backpressurePull(subscriptionName, numRequested, (FluxSink<AcknowledgeablePubsubMessage>)sink);
            }
        }));
    }

    private void pollingPull(String subscriptionName, long pollingPeriodMs, FluxSink<AcknowledgeablePubsubMessage> sink) {
        Disposable disposable = Flux.interval((Duration)Duration.ZERO, (Duration)Duration.ofMillis(pollingPeriodMs), (Scheduler)this.scheduler).flatMap(ignore -> this.pullAll(subscriptionName)).subscribe(arg_0 -> sink.next(arg_0), arg_0 -> sink.error(arg_0));
        sink.onDispose(disposable);
    }

    private Flux<AcknowledgeablePubsubMessage> pullAll(String subscriptionName) {
        CompletableFuture<List<AcknowledgeablePubsubMessage>> pullResponseFuture = this.subscriberOperations.pullAsync(subscriptionName, this.maxMessages, true);
        return Mono.fromFuture(pullResponseFuture).flatMapMany(Flux::fromIterable);
    }

    private void backpressurePull(String subscriptionName, long numRequested, FluxSink<AcknowledgeablePubsubMessage> sink) {
        int intDemand = numRequested > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int)numRequested;
        this.subscriberOperations.pullAsync(subscriptionName, intDemand, false).whenComplete((messages, exception) -> {
            long numToPull;
            if (exception != null) {
                this.exceptionHandler(subscriptionName, numRequested, sink, (Throwable)exception);
                return;
            }
            if (!sink.isCancelled()) {
                messages.forEach(arg_0 -> ((FluxSink)sink).next(arg_0));
            }
            if (!sink.isCancelled() && (numToPull = numRequested - (long)messages.size()) > 0L) {
                this.backpressurePull(subscriptionName, numToPull, sink);
            }
        });
    }

    private void exceptionHandler(String subscriptionName, long numRequested, FluxSink<AcknowledgeablePubsubMessage> sink, Throwable exception) {
        if (exception instanceof DeadlineExceededException) {
            if (LOGGER.isTraceEnabled()) {
                LOGGER.trace((Object)("Blocking pull timed out due to empty subscription " + subscriptionName + "; retrying."));
            }
            this.backpressurePull(subscriptionName, numRequested, sink);
        } else {
            sink.error(exception);
        }
    }
}

