package org.axonframework.axonserver.connector.query;

import io.axoniq.axonserver.connector.FlowControl;
import io.axoniq.axonserver.connector.ReplyChannel;
import io.axoniq.axonserver.grpc.query.QueryRequest;
import io.axoniq.axonserver.grpc.query.QueryResponse;
import io.netty.util.internal.OutOfDirectMemoryError;
import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import org.axonframework.axonserver.connector.ErrorCode;
import org.axonframework.axonserver.connector.util.ExceptionSerializer;
import org.axonframework.axonserver.connector.util.ProcessingInstructionHelper;
import org.axonframework.messaging.responsetypes.MultipleInstancesResponseType;
import org.axonframework.queryhandling.GenericStreamingQueryMessage;
import org.axonframework.queryhandling.QueryBus;
import org.axonframework.queryhandling.QueryBusSpanFactory;
import org.axonframework.queryhandling.QueryMessage;
import org.axonframework.queryhandling.QueryResponseMessage;
import org.axonframework.util.ClasspathResolver;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/axonframework/axonserver/connector/query/QueryProcessingTask.class */
public class QueryProcessingTask implements Runnable, FlowControl {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final int DIRECT_QUERY_NUMBER_OF_RESULTS = 1;
    private final QueryBus localSegment;
    private final QueryRequest queryRequest;
    private final ReplyChannel<QueryResponse> responseHandler;
    private final QuerySerializer serializer;
    private final String clientId;
    private final AtomicReference<StreamableResponse> streamableResultRef;
    private final AtomicLong requestedBeforeInit;
    private final AtomicBoolean cancelledBeforeInit;
    private final boolean supportsStreaming;
    private final Supplier<Boolean> reactorOnClassPath;
    private final QueryBusSpanFactory spanFactory;

    /* JADX INFO: Access modifiers changed from: package-private */
    public QueryProcessingTask(QueryBus queryBus, QueryRequest queryRequest, ReplyChannel<QueryResponse> replyChannel, QuerySerializer querySerializer, String str, QueryBusSpanFactory queryBusSpanFactory) {
        this(queryBus, queryRequest, replyChannel, querySerializer, str, ClasspathResolver::projectReactorOnClasspath, queryBusSpanFactory);
    }

    QueryProcessingTask(QueryBus queryBus, QueryRequest queryRequest, ReplyChannel<QueryResponse> replyChannel, QuerySerializer querySerializer, String str, Supplier<Boolean> supplier, QueryBusSpanFactory queryBusSpanFactory) {
        this.streamableResultRef = new AtomicReference<>();
        this.requestedBeforeInit = new AtomicLong();
        this.cancelledBeforeInit = new AtomicBoolean();
        this.localSegment = queryBus;
        this.queryRequest = queryRequest;
        this.responseHandler = replyChannel;
        this.serializer = querySerializer;
        this.clientId = str;
        this.supportsStreaming = supportsStreaming(queryRequest);
        this.reactorOnClassPath = supplier;
        this.spanFactory = queryBusSpanFactory;
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            logger.debug("Will process query [{}]", this.queryRequest.getQuery());
            QueryMessage deserializeRequest = this.serializer.deserializeRequest(this.queryRequest);
            this.spanFactory.createQueryProcessingSpan(deserializeRequest).run(() -> {
                if (ProcessingInstructionHelper.numberOfResults(this.queryRequest.getProcessingInstructionsList()) != 1) {
                    scatterGather(deserializeRequest);
                } else if (this.supportsStreaming && this.reactorOnClassPath.get().booleanValue()) {
                    streamingQuery(deserializeRequest);
                } else {
                    directQuery(deserializeRequest);
                }
            });
        } catch (RuntimeException | OutOfDirectMemoryError e) {
            sendError(e);
            logger.warn("Query Processor had an exception when processing query [{}]", this.queryRequest.getQuery(), e);
        }
    }

    public void request(long j) {
        if (j > 0 && !requestIfInitialized(j)) {
            this.requestedBeforeInit.getAndUpdate(j2 -> {
                try {
                    return Math.addExact(j, j2);
                } catch (ArithmeticException e) {
                    return Long.MAX_VALUE;
                }
            });
            requestIfInitialized(this.requestedBeforeInit.get());
        }
    }

    public void cancel() {
        StreamableResponse streamableResponse = this.streamableResultRef.get();
        if (streamableResponse != null) {
            streamableResponse.cancel();
        } else {
            this.cancelledBeforeInit.set(true);
        }
    }

    private <Q, R> void streamingQuery(QueryMessage<Q, R> queryMessage) {
        setResult(streamableFluxResult(this.localSegment.streamingQuery(new GenericStreamingQueryMessage(queryMessage, queryMessage.getQueryName(), queryMessage.getResponseType().getExpectedResponseType()))));
    }

    private <Q, R, T> void directQuery(QueryMessage<Q, R> queryMessage) {
        this.localSegment.query(queryMessage).whenComplete((BiConsumer) (queryResponseMessage, th) -> {
            if (th != null) {
                sendError(th);
                return;
            }
            try {
                setResult((this.supportsStreaming && (queryMessage.getResponseType() instanceof MultipleInstancesResponseType)) ? streamableMultiInstanceResult(queryResponseMessage, queryMessage.getResponseType().getExpectedResponseType()) : streamableInstanceResult(queryResponseMessage));
            } catch (Throwable th) {
                sendError(th);
            }
        });
    }

    private void setResult(StreamableResponse streamableResponse) {
        this.streamableResultRef.set(streamableResponse);
        if (this.cancelledBeforeInit.get()) {
            cancel();
        } else {
            request(this.requestedBeforeInit.get());
        }
    }

    private <Q, R> void scatterGather(QueryMessage<Q, R> queryMessage) {
        this.localSegment.scatterGather(queryMessage, ProcessingInstructionHelper.timeout(this.queryRequest.getProcessingInstructionsList()), TimeUnit.MILLISECONDS).forEach(queryResponseMessage -> {
            this.responseHandler.send(this.serializer.serializeResponse(queryResponseMessage, this.queryRequest.getMessageIdentifier()));
        });
        this.responseHandler.complete();
    }

    private <R> StreamableResponse streamableFluxResult(Publisher<QueryResponseMessage<R>> publisher) {
        return new StreamableFluxResponse(Flux.from(publisher), this.responseHandler, this.serializer, this.queryRequest.getMessageIdentifier(), this.clientId);
    }

    private <R> StreamableMultiInstanceResponse<R> streamableMultiInstanceResult(QueryResponseMessage<List<R>> queryResponseMessage, Class<R> cls) {
        return new StreamableMultiInstanceResponse<>(queryResponseMessage, cls, this.responseHandler, this.serializer, this.queryRequest.getMessageIdentifier());
    }

    private StreamableInstanceResponse streamableInstanceResult(QueryResponseMessage<?> queryResponseMessage) {
        return new StreamableInstanceResponse(queryResponseMessage, this.responseHandler, this.serializer, this.queryRequest.getMessageIdentifier());
    }

    private boolean supportsStreaming(QueryRequest queryRequest) {
        return ProcessingInstructionHelper.axonServerSupportsQueryStreaming(queryRequest.getProcessingInstructionsList()) && ProcessingInstructionHelper.clientSupportsQueryStreaming(queryRequest.getProcessingInstructionsList());
    }

    private boolean requestIfInitialized(long j) {
        StreamableResponse streamableResponse = this.streamableResultRef.get();
        if (streamableResponse == null) {
            return false;
        }
        streamableResponse.request(j);
        return true;
    }

    private void sendError(Throwable th) {
        this.responseHandler.sendLast(QueryResponse.newBuilder().setErrorCode(ErrorCode.getQueryExecutionErrorCode(th).errorCode()).setErrorMessage(ExceptionSerializer.serialize(this.clientId, th)).setRequestIdentifier(this.queryRequest.getMessageIdentifier()).build());
    }
}
