package com.apollographql.subscription.callback;

import com.apollographql.subscription.exception.CallbackInitializationFailedException;
import com.apollographql.subscription.exception.InactiveSubscriptionException;
import com.apollographql.subscription.message.CallbackMessageCheck;
import com.apollographql.subscription.message.CallbackMessageComplete;
import com.apollographql.subscription.message.CallbackMessageNext;
import com.apollographql.subscription.message.SubscritionCallbackMessage;
import graphql.ExecutionResult;
import graphql.GraphqlErrorBuilder;
import java.time.Duration;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jetbrains.annotations.NotNull;
import org.reactivestreams.Publisher;
import org.springframework.graphql.ExecutionGraphQlService;
import org.springframework.graphql.server.WebGraphQlRequest;
import org.springframework.http.HttpStatusCode;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:com/apollographql/subscription/callback/SubscriptionCallbackHandler.class */
public class SubscriptionCallbackHandler {
    private static final Log logger = LogFactory.getLog(SubscriptionCallbackHandler.class);
    public static final String SUBSCRIPTION_PROTOCOL_HEADER = "subscription-protocol";
    public static final String SUBSCRIPTION_PROTOCOL_HEADER_VALUE = "callback/1.0";
    private final ExecutionGraphQlService graphQlService;
    private final Scheduler scheduler;

    public SubscriptionCallbackHandler(ExecutionGraphQlService executionGraphQlService) {
        this(executionGraphQlService, Schedulers.boundedElastic());
    }

    public SubscriptionCallbackHandler(ExecutionGraphQlService executionGraphQlService, Scheduler scheduler) {
        this.graphQlService = executionGraphQlService;
        this.scheduler = scheduler;
    }

    @NotNull
    public Mono<ExecutionResult> handleSubscriptionUsingCallback(@NotNull WebGraphQlRequest webGraphQlRequest, @NotNull SubscriptionCallback subscriptionCallback) {
        if (logger.isDebugEnabled()) {
            logger.debug("Starting subscription callback: " + String.valueOf(subscriptionCallback));
        }
        WebClient build = WebClient.builder().baseUrl(subscriptionCallback.callback_url()).build();
        return build.post().header("Content-Type", new String[]{"application/json"}).header(SUBSCRIPTION_PROTOCOL_HEADER, new String[]{SUBSCRIPTION_PROTOCOL_HEADER_VALUE}).headers(httpHeaders -> {
            httpHeaders.putAll(subscriptionCallback.context());
        }).bodyValue(new CallbackMessageCheck(subscriptionCallback.subscription_id(), subscriptionCallback.verifier())).exchangeToMono(clientResponse -> {
            HttpStatusCode statusCode = clientResponse.statusCode();
            if (!statusCode.is2xxSuccessful()) {
                return Mono.error(new CallbackInitializationFailedException(subscriptionCallback, statusCode.value()));
            }
            if (logger.isDebugEnabled()) {
                logger.debug("Subscription callback init successful: " + String.valueOf(subscriptionCallback));
            }
            Flux<SubscritionCallbackMessage> startSubscription = startSubscription(build, webGraphQlRequest, subscriptionCallback);
            return Mono.just(emptyResult()).publishOn(this.scheduler).doOnSubscribe(subscription -> {
                startSubscription.subscribe();
            });
        });
    }

    private ExecutionResult emptyResult() {
        return ExecutionResult.newExecutionResult().data((Object) null).build();
    }

    @NotNull
    protected Flux<SubscritionCallbackMessage> startSubscription(@NotNull WebClient webClient, @NotNull WebGraphQlRequest webGraphQlRequest, @NotNull SubscriptionCallback subscriptionCallback) {
        Flux<SubscritionCallbackMessage> heartbeatFlux = subscriptionCallback.heartbeatIntervalMs() > 0 ? heartbeatFlux(webClient, new CallbackMessageCheck(subscriptionCallback.subscription_id(), subscriptionCallback.verifier()), subscriptionCallback) : Flux.empty();
        Publisher refCount = this.graphQlService.execute(webGraphQlRequest).flatMapMany(executionGraphQlResponse -> {
            return (executionGraphQlResponse.getData() instanceof Publisher ? Flux.from((Publisher) executionGraphQlResponse.getData()).map((v0) -> {
                return v0.toSpecification();
            }) : Flux.just(executionGraphQlResponse.toMap())).map(map -> {
                return new CallbackMessageNext(subscriptionCallback.subscription_id(), subscriptionCallback.verifier(), map);
            }).concatWith(Mono.just(new CallbackMessageComplete(subscriptionCallback.subscription_id(), subscriptionCallback.verifier()))).onErrorResume(th -> {
                return Mono.just(new CallbackMessageComplete(subscriptionCallback.subscription_id(), subscriptionCallback.verifier(), List.of(GraphqlErrorBuilder.newError().message(th.getMessage(), new Object[0]).build())));
            });
        }).publishOn(this.scheduler).concatMap(subscritionCallbackMessage -> {
            return webClient.post().header("Content-Type", new String[]{"application/json"}).header(SUBSCRIPTION_PROTOCOL_HEADER, new String[]{SUBSCRIPTION_PROTOCOL_HEADER_VALUE}).headers(httpHeaders -> {
                httpHeaders.putAll(subscriptionCallback.context());
            }).bodyValue(subscritionCallbackMessage).exchangeToMono(clientResponse -> {
                return clientResponse.statusCode().is2xxSuccessful() ? Mono.just(subscritionCallbackMessage) : Mono.error(new InactiveSubscriptionException(subscriptionCallback));
            });
        }).doOnError(th -> {
            if (logger.isErrorEnabled()) {
                logger.error("Subscription terminated abnormally due to exception", th);
            }
        }).publish().refCount(2);
        return Flux.merge(new Publisher[]{refCount, heartbeatFlux.takeUntilOther(refCount.ignoreElements())});
    }

    private Flux<SubscritionCallbackMessage> heartbeatFlux(WebClient webClient, CallbackMessageCheck callbackMessageCheck, SubscriptionCallback subscriptionCallback) {
        return Flux.just(callbackMessageCheck).delayElements(Duration.ofMillis(subscriptionCallback.heartbeatIntervalMs())).publishOn(this.scheduler).concatMap(callbackMessageCheck2 -> {
            return webClient.post().header("Content-Type", new String[]{"application/json"}).header(SUBSCRIPTION_PROTOCOL_HEADER, new String[]{SUBSCRIPTION_PROTOCOL_HEADER_VALUE}).headers(httpHeaders -> {
                httpHeaders.putAll(subscriptionCallback.context());
            }).bodyValue(callbackMessageCheck2).exchangeToFlux(clientResponse -> {
                if (clientResponse.statusCode().is2xxSuccessful()) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Subscription callback heartbeat successful: " + String.valueOf(subscriptionCallback));
                    }
                    return heartbeatFlux(webClient, callbackMessageCheck2, subscriptionCallback);
                }
                if (logger.isWarnEnabled()) {
                    logger.warn("Subscription callback heartbeat failed: " + String.valueOf(subscriptionCallback));
                }
                return Flux.error(new InactiveSubscriptionException(subscriptionCallback));
            });
        });
    }
}
