package org.axonframework.axonserver.connector.query;

import io.axoniq.axonserver.connector.FlowControl;
import io.axoniq.axonserver.connector.Registration;
import io.axoniq.axonserver.connector.ReplyChannel;
import io.axoniq.axonserver.connector.ResultStream;
import io.axoniq.axonserver.connector.ResultStreamPublisher;
import io.axoniq.axonserver.connector.impl.CloseAwareReplyChannel;
import io.axoniq.axonserver.connector.query.QueryDefinition;
import io.axoniq.axonserver.connector.query.QueryHandler;
import io.axoniq.axonserver.grpc.ErrorMessage;
import io.axoniq.axonserver.grpc.query.QueryProviderInbound;
import io.axoniq.axonserver.grpc.query.QueryProviderOutbound;
import io.axoniq.axonserver.grpc.query.QueryRequest;
import io.axoniq.axonserver.grpc.query.QueryResponse;
import io.axoniq.axonserver.grpc.query.QueryUpdate;
import io.axoniq.axonserver.grpc.query.SubscriptionQuery;
import io.grpc.stub.StreamObserver;
import java.lang.invoke.MethodHandles;
import java.lang.reflect.Type;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Spliterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.annotation.Nonnull;
import org.axonframework.axonserver.connector.AxonServerConfiguration;
import org.axonframework.axonserver.connector.AxonServerConnectionManager;
import org.axonframework.axonserver.connector.DispatchInterceptors;
import org.axonframework.axonserver.connector.ErrorCode;
import org.axonframework.axonserver.connector.InstructionAckSource;
import org.axonframework.axonserver.connector.PriorityRunnable;
import org.axonframework.axonserver.connector.TargetContextResolver;
import org.axonframework.axonserver.connector.command.AxonServerRegistration;
import org.axonframework.axonserver.connector.query.subscription.AxonServerSubscriptionQueryResult;
import org.axonframework.axonserver.connector.query.subscription.SubscriptionMessageSerializer;
import org.axonframework.axonserver.connector.util.ExceptionSerializer;
import org.axonframework.axonserver.connector.util.ExecutorServiceBuilder;
import org.axonframework.axonserver.connector.util.PriorityTaskSchedulers;
import org.axonframework.axonserver.connector.util.ProcessingInstructionHelper;
import org.axonframework.axonserver.connector.util.UpstreamAwareStreamObserver;
import org.axonframework.common.Assert;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.StringUtils;
import org.axonframework.lifecycle.Lifecycle;
import org.axonframework.lifecycle.ShutdownLatch;
import org.axonframework.messaging.Distributed;
import org.axonframework.messaging.GenericMessage;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageDispatchInterceptor;
import org.axonframework.messaging.MessageHandler;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.messaging.responsetypes.ConvertingResponseMessage;
import org.axonframework.messaging.responsetypes.InstanceResponseType;
import org.axonframework.messaging.responsetypes.MultipleInstancesResponseType;
import org.axonframework.messaging.responsetypes.ResponseType;
import org.axonframework.queryhandling.GenericQueryResponseMessage;
import org.axonframework.queryhandling.QueryBus;
import org.axonframework.queryhandling.QueryMessage;
import org.axonframework.queryhandling.QueryResponseMessage;
import org.axonframework.queryhandling.QueryUpdateEmitter;
import org.axonframework.queryhandling.StreamingQueryMessage;
import org.axonframework.queryhandling.SubscriptionQueryBackpressure;
import org.axonframework.queryhandling.SubscriptionQueryMessage;
import org.axonframework.queryhandling.SubscriptionQueryResult;
import org.axonframework.queryhandling.SubscriptionQueryUpdateMessage;
import org.axonframework.queryhandling.UpdateHandlerRegistration;
import org.axonframework.serialization.Serializer;
import org.axonframework.tracing.NoOpSpanFactory;
import org.axonframework.tracing.Span;
import org.axonframework.tracing.SpanFactory;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.scheduler.Scheduler;

/* loaded from: input_file:org/axonframework/axonserver/connector/query/AxonServerQueryBus.class */
public class AxonServerQueryBus implements QueryBus, Distributed<QueryBus>, Lifecycle {
    private static final int DIRECT_QUERY_NUMBER_OF_RESULTS = 1;
    private static final int SCATTER_GATHER_NUMBER_OF_RESULTS = -1;
    private static final int QUERY_QUEUE_CAPACITY = 1000;
    private final AxonServerConnectionManager axonServerConnectionManager;
    private final AxonServerConfiguration configuration;
    private final QueryUpdateEmitter updateEmitter;
    private final QueryBus localSegment;
    private final QuerySerializer serializer;
    private final SubscriptionMessageSerializer subscriptionSerializer;
    private final QueryPriorityCalculator priorityCalculator;
    private final DispatchInterceptors<QueryMessage<?, ?>> dispatchInterceptors;
    private final TargetContextResolver<? super QueryMessage<?, ?>> targetContextResolver;
    private final ShutdownLatch shutdownLatch = new ShutdownLatch();
    private final ExecutorService queryExecutor;
    private final LocalSegmentAdapter localSegmentAdapter;
    private final String context;
    private final SpanFactory spanFactory;
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final AtomicLong TASK_SEQUENCE = new AtomicLong(Long.MIN_VALUE);
    private static final long DIRECT_QUERY_TIMEOUT_MS = TimeUnit.HOURS.toMillis(1);

    /* loaded from: input_file:org/axonframework/axonserver/connector/query/AxonServerQueryBus$ActivityFinisher.class */
    private static class ActivityFinisher implements Consumer<SignalType> {
        private final ShutdownLatch.ActivityHandle activity;
        private final Span span;

        private ActivityFinisher(ShutdownLatch.ActivityHandle activityHandle, Span span) {
            this.activity = activityHandle;
            this.span = span;
        }

        @Override // java.util.function.Consumer
        public void accept(SignalType signalType) {
            this.span.end();
            this.activity.end();
        }
    }

    /* loaded from: input_file:org/axonframework/axonserver/connector/query/AxonServerQueryBus$Builder.class */
    public static class Builder {
        private AxonServerConnectionManager axonServerConnectionManager;
        private AxonServerConfiguration configuration;
        private QueryBus localSegment;
        private QueryUpdateEmitter updateEmitter;
        private Serializer messageSerializer;
        private Serializer genericSerializer;
        private String defaultContext;
        private QueryPriorityCalculator priorityCalculator = QueryPriorityCalculator.defaultQueryPriorityCalculator();
        private TargetContextResolver<? super QueryMessage<?, ?>> targetContextResolver = queryMessage -> {
            return this.configuration.getContext();
        };
        private ExecutorServiceBuilder executorServiceBuilder = ExecutorServiceBuilder.defaultQueryExecutorServiceBuilder();
        private SpanFactory spanFactory = NoOpSpanFactory.INSTANCE;

        public Builder axonServerConnectionManager(AxonServerConnectionManager axonServerConnectionManager) {
            BuilderUtils.assertNonNull(axonServerConnectionManager, "AxonServerConnectionManager may not be null");
            this.axonServerConnectionManager = axonServerConnectionManager;
            return this;
        }

        public Builder configuration(AxonServerConfiguration axonServerConfiguration) {
            BuilderUtils.assertNonNull(axonServerConfiguration, "AxonServerConfiguration may not be null");
            this.configuration = axonServerConfiguration;
            return this;
        }

        public Builder localSegment(QueryBus queryBus) {
            BuilderUtils.assertNonNull(queryBus, "Local QueryBus may not be null");
            this.localSegment = queryBus;
            return this;
        }

        public Builder updateEmitter(QueryUpdateEmitter queryUpdateEmitter) {
            BuilderUtils.assertNonNull(queryUpdateEmitter, "QueryUpdateEmitter may not be null");
            this.updateEmitter = queryUpdateEmitter;
            return this;
        }

        public Builder messageSerializer(Serializer serializer) {
            BuilderUtils.assertNonNull(serializer, "Message Serializer may not be null");
            this.messageSerializer = serializer;
            return this;
        }

        public Builder genericSerializer(Serializer serializer) {
            BuilderUtils.assertNonNull(serializer, "Generic Serializer may not be null");
            this.genericSerializer = serializer;
            return this;
        }

        public Builder priorityCalculator(QueryPriorityCalculator queryPriorityCalculator) {
            BuilderUtils.assertNonNull(this.targetContextResolver, "QueryPriorityCalculator may not be null");
            this.priorityCalculator = queryPriorityCalculator;
            return this;
        }

        public Builder targetContextResolver(TargetContextResolver<? super QueryMessage<?, ?>> targetContextResolver) {
            BuilderUtils.assertNonNull(targetContextResolver, "TargetContextResolver may not be null");
            this.targetContextResolver = targetContextResolver;
            return this;
        }

        public Builder executorServiceBuilder(ExecutorServiceBuilder executorServiceBuilder) {
            BuilderUtils.assertNonNull(executorServiceBuilder, "ExecutorServiceBuilder may not be null");
            this.executorServiceBuilder = executorServiceBuilder;
            return this;
        }

        @Deprecated
        public Builder requestStreamFactory(Function<UpstreamAwareStreamObserver<QueryProviderInbound>, StreamObserver<QueryProviderOutbound>> function) {
            return this;
        }

        @Deprecated
        public Builder instructionAckSource(InstructionAckSource<QueryProviderOutbound> instructionAckSource) {
            return this;
        }

        public Builder defaultContext(String str) {
            BuilderUtils.assertNonEmpty(str, "The context may not be null or empty");
            this.defaultContext = str;
            return this;
        }

        public Builder spanFactory(@Nonnull SpanFactory spanFactory) {
            BuilderUtils.assertNonNull(spanFactory, "The SpanFactory may not be null or empty");
            this.spanFactory = spanFactory;
            return this;
        }

        public AxonServerQueryBus build() {
            return new AxonServerQueryBus(this);
        }

        protected QuerySerializer buildQuerySerializer() {
            return new QuerySerializer(this.messageSerializer, this.genericSerializer, this.configuration);
        }

        protected SubscriptionMessageSerializer buildSubscriptionMessageSerializer() {
            return new SubscriptionMessageSerializer(this.messageSerializer, this.genericSerializer, this.configuration);
        }

        protected void validate() throws AxonConfigurationException {
            BuilderUtils.assertNonNull(this.axonServerConnectionManager, "The AxonServerConnectionManager is a hard requirement and should be provided");
            BuilderUtils.assertNonNull(this.configuration, "The AxonServerConfiguration is a hard requirement and should be provided");
            BuilderUtils.assertNonNull(this.localSegment, "The Local QueryBus is a hard requirement and should be provided");
            BuilderUtils.assertNonNull(this.updateEmitter, "The QueryUpdateEmitter is a hard requirement and should be provided");
            BuilderUtils.assertNonNull(this.messageSerializer, "The Message Serializer is a hard requirement and should be provided");
            BuilderUtils.assertNonNull(this.genericSerializer, "The Generic Serializer is a hard requirement and should be provided");
        }
    }

    /* loaded from: input_file:org/axonframework/axonserver/connector/query/AxonServerQueryBus$LocalSegmentAdapter.class */
    private class LocalSegmentAdapter implements QueryHandler {
        private final Map<String, QueryProcessingTask> queriesInProgress;

        private LocalSegmentAdapter() {
            this.queriesInProgress = new ConcurrentHashMap();
        }

        public void cancel() {
            this.queriesInProgress.values().iterator().forEachRemaining((v0) -> {
                v0.cancel();
            });
        }

        public void handle(QueryRequest queryRequest, ReplyChannel<QueryResponse> replyChannel) {
            stream(queryRequest, replyChannel).request(Long.MAX_VALUE);
        }

        public FlowControl stream(QueryRequest queryRequest, ReplyChannel<QueryResponse> replyChannel) {
            CloseAwareReplyChannel closeAwareReplyChannel = new CloseAwareReplyChannel(replyChannel, () -> {
                this.queriesInProgress.remove(queryRequest.getMessageIdentifier());
            });
            long priority = ProcessingInstructionHelper.priority(queryRequest.getProcessingInstructionsList());
            final QueryProcessingTask queryProcessingTask = new QueryProcessingTask(AxonServerQueryBus.this.localSegment, queryRequest, closeAwareReplyChannel, AxonServerQueryBus.this.serializer, AxonServerQueryBus.this.configuration.getClientId(), AxonServerQueryBus.this.spanFactory);
            final PriorityRunnable priorityRunnable = new PriorityRunnable(queryProcessingTask, priority, AxonServerQueryBus.TASK_SEQUENCE.incrementAndGet());
            this.queriesInProgress.put(queryRequest.getMessageIdentifier(), queryProcessingTask);
            AxonServerQueryBus.this.queryExecutor.execute(priorityRunnable);
            return new FlowControl() { // from class: org.axonframework.axonserver.connector.query.AxonServerQueryBus.LocalSegmentAdapter.1
                public void request(long j) {
                    ExecutorService executorService = AxonServerQueryBus.this.queryExecutor;
                    QueryProcessingTask queryProcessingTask2 = queryProcessingTask;
                    executorService.execute(new PriorityRunnable(() -> {
                        queryProcessingTask2.request(j);
                    }, priorityRunnable.priority(), AxonServerQueryBus.TASK_SEQUENCE.incrementAndGet()));
                }

                public void cancel() {
                    ExecutorService executorService = AxonServerQueryBus.this.queryExecutor;
                    QueryProcessingTask queryProcessingTask2 = queryProcessingTask;
                    queryProcessingTask2.getClass();
                    executorService.execute(new PriorityRunnable(queryProcessingTask2::cancel, priorityRunnable.priority(), AxonServerQueryBus.TASK_SEQUENCE.incrementAndGet()));
                }
            };
        }

        public Registration registerSubscriptionQuery(SubscriptionQuery subscriptionQuery, QueryHandler.UpdateHandler updateHandler) {
            UpdateHandlerRegistration registerUpdateHandler = AxonServerQueryBus.this.updateEmitter.registerUpdateHandler(AxonServerQueryBus.this.subscriptionSerializer.deserialize(subscriptionQuery), 1024);
            Flux doOnError = registerUpdateHandler.getUpdates().doOnError(th -> {
                ErrorMessage serialize = ExceptionSerializer.serialize(AxonServerQueryBus.this.configuration.getClientId(), th);
                updateHandler.sendUpdate(QueryUpdate.newBuilder().setErrorMessage(serialize).setErrorCode(ErrorCode.getQueryExecutionErrorCode(th).errorCode()).build());
                updateHandler.complete();
            });
            updateHandler.getClass();
            Flux doOnComplete = doOnError.doOnComplete(updateHandler::complete);
            SubscriptionMessageSerializer subscriptionMessageSerializer = AxonServerQueryBus.this.subscriptionSerializer;
            subscriptionMessageSerializer.getClass();
            Flux map = doOnComplete.map(subscriptionMessageSerializer::serialize);
            updateHandler.getClass();
            map.subscribe(updateHandler::sendUpdate);
            return () -> {
                registerUpdateHandler.getRegistration().close();
                return CompletableFuture.completedFuture(null);
            };
        }
    }

    /* loaded from: input_file:org/axonframework/axonserver/connector/query/AxonServerQueryBus$QueryResponseSpliterator.class */
    private static class QueryResponseSpliterator<Q, R> implements Spliterator<QueryResponseMessage<R>> {
        private final QueryMessage<Q, R> queryMessage;
        private final ResultStream<QueryResponse> queryResult;
        private final long deadline;
        private final QuerySerializer serializer;
        private final Runnable closeHandler;

        public QueryResponseSpliterator(QueryMessage<Q, R> queryMessage, ResultStream<QueryResponse> resultStream, long j, QuerySerializer querySerializer, Runnable runnable) {
            this.queryMessage = queryMessage;
            this.queryResult = resultStream;
            this.deadline = j;
            this.serializer = querySerializer;
            this.closeHandler = runnable;
        }

        @Override // java.util.Spliterator
        public boolean tryAdvance(Consumer<? super QueryResponseMessage<R>> consumer) {
            QueryResponse queryResponse;
            long currentTimeMillis = this.deadline - System.currentTimeMillis();
            if (currentTimeMillis > 0) {
                try {
                    queryResponse = (QueryResponse) this.queryResult.nextIfAvailable(currentTimeMillis, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    this.closeHandler.run();
                    return false;
                }
            } else {
                queryResponse = (QueryResponse) this.queryResult.nextIfAvailable();
            }
            if (queryResponse != null) {
                consumer.accept(this.serializer.deserializeResponse(queryResponse, this.queryMessage.getResponseType()));
                return true;
            }
            this.queryResult.close();
            this.closeHandler.run();
            return false;
        }

        @Override // java.util.Spliterator
        public Spliterator<QueryResponseMessage<R>> trySplit() {
            return null;
        }

        @Override // java.util.Spliterator
        public long estimateSize() {
            return Long.MAX_VALUE;
        }

        @Override // java.util.Spliterator
        public int characteristics() {
            return 0;
        }
    }

    /* loaded from: input_file:org/axonframework/axonserver/connector/query/AxonServerQueryBus$ResponseProcessingTask.class */
    private static class ResponseProcessingTask<R> implements Runnable {
        private final AtomicBoolean singleExecutionCheck = new AtomicBoolean();
        private final ResultStream<QueryResponse> result;
        private final QuerySerializer serializer;
        private final CompletableFuture<QueryResponseMessage<R>> queryTransaction;
        private final ResponseType<R> expectedResponseType;

        public ResponseProcessingTask(ResultStream<QueryResponse> resultStream, QuerySerializer querySerializer, CompletableFuture<QueryResponseMessage<R>> completableFuture, ResponseType<R> responseType) {
            this.result = resultStream;
            this.serializer = querySerializer;
            this.queryTransaction = completableFuture;
            this.expectedResponseType = responseType;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (this.singleExecutionCheck.compareAndSet(false, true)) {
                QueryResponse queryResponse = (QueryResponse) this.result.nextIfAvailable();
                if (queryResponse != null) {
                    this.queryTransaction.complete(this.serializer.deserializeResponse(queryResponse, this.expectedResponseType));
                    return;
                }
                if (!this.result.isClosed() || this.queryTransaction.isDone()) {
                    return;
                }
                Optional error = this.result.getError();
                ErrorCode errorCode = ErrorCode.QUERY_DISPATCH_ERROR;
                errorCode.getClass();
                this.queryTransaction.completeExceptionally((Exception) error.map(errorCode::convert).orElse(new AxonServerQueryDispatchException(ErrorCode.QUERY_DISPATCH_ERROR.errorCode(), "Query did not yield the expected number of results.")));
            }
        }
    }

    public AxonServerQueryBus(Builder builder) {
        builder.validate();
        this.axonServerConnectionManager = builder.axonServerConnectionManager;
        this.configuration = builder.configuration;
        this.updateEmitter = builder.updateEmitter;
        this.localSegment = builder.localSegment;
        this.serializer = builder.buildQuerySerializer();
        this.subscriptionSerializer = builder.buildSubscriptionMessageSerializer();
        this.priorityCalculator = builder.priorityCalculator;
        this.context = StringUtils.nonEmptyOrNull(builder.defaultContext) ? builder.defaultContext : this.configuration.getContext();
        this.targetContextResolver = builder.targetContextResolver.orElse(message -> {
            return this.context;
        });
        this.spanFactory = builder.spanFactory;
        this.dispatchInterceptors = new DispatchInterceptors<>();
        this.queryExecutor = builder.executorServiceBuilder.apply(this.configuration, new PriorityBlockingQueue(QUERY_QUEUE_CAPACITY));
        this.localSegmentAdapter = new LocalSegmentAdapter();
    }

    public <Q, R> Publisher<QueryResponseMessage<R>> streamingQuery(StreamingQueryMessage<Q, R> streamingQueryMessage) {
        Span start = this.spanFactory.createInternalSpan(() -> {
            return "AxonServerQueryBus.streamingQuery";
        }, streamingQueryMessage).start();
        QueryMessage<?, ?> queryMessage = (StreamingQueryMessage) this.spanFactory.propagateContext(streamingQueryMessage);
        int determinePriority = this.priorityCalculator.determinePriority(queryMessage);
        AtomicReference atomicReference = new AtomicReference(PriorityTaskSchedulers.forPriority(this.queryExecutor, determinePriority, TASK_SEQUENCE));
        return Mono.fromSupplier(this::registerStreamingQueryActivity).flatMapMany(activityHandle -> {
            Flux publishOn = Mono.just(this.dispatchInterceptors.intercept(queryMessage)).flatMapMany(streamingQueryMessage2 -> {
                return Mono.just(serializeStreaming(streamingQueryMessage2, determinePriority)).flatMapMany(queryRequest -> {
                    return new ResultStreamPublisher(() -> {
                        return sendRequest(streamingQueryMessage2, queryRequest);
                    });
                }).concatMap(queryResponse -> {
                    return deserialize(streamingQueryMessage2, queryResponse);
                });
            }).publishOn((Scheduler) atomicReference.get());
            start.getClass();
            return publishOn.doOnError(start::recordException).doFinally(new ActivityFinisher(activityHandle, start));
        }).subscribeOn((Scheduler) atomicReference.get());
    }

    public static Builder builder() {
        return new Builder();
    }

    public void registerLifecycleHandlers(@Nonnull Lifecycle.LifecycleRegistry lifecycleRegistry) {
        lifecycleRegistry.onStart(536870911, this::start);
        lifecycleRegistry.onShutdown(536870911, this::disconnect);
        lifecycleRegistry.onShutdown(0, this::shutdownDispatching);
    }

    public void start() {
        this.shutdownLatch.initialize();
    }

    public <R> org.axonframework.common.Registration subscribe(@Nonnull String str, @Nonnull Type type, @Nonnull MessageHandler<? super QueryMessage<?, R>> messageHandler) {
        org.axonframework.common.Registration subscribe = this.localSegment.subscribe(str, type, messageHandler);
        Registration registerQueryHandler = this.axonServerConnectionManager.getConnection(this.context).queryChannel().registerQueryHandler(this.localSegmentAdapter, new QueryDefinition[]{new QueryDefinition(str, type)});
        registerQueryHandler.getClass();
        return new AxonServerRegistration(subscribe, registerQueryHandler::cancel);
    }

    public <Q, R> CompletableFuture<QueryResponseMessage<R>> query(@Nonnull QueryMessage<Q, R> queryMessage) {
        Span start = this.spanFactory.createDispatchSpan(() -> {
            return "AxonServerQueryBus.query";
        }, queryMessage, new Message[0]).start();
        QueryMessage propagateContext = this.spanFactory.propagateContext(queryMessage);
        Assert.isFalse(Publisher.class.isAssignableFrom(queryMessage.getResponseType().getExpectedResponseType()), () -> {
            return "The direct query does not support Flux as a return type.";
        });
        this.shutdownLatch.ifShuttingDown("Cannot dispatch new queries as this bus is being shut down");
        QueryMessage<?, ?> queryMessage2 = (QueryMessage) this.dispatchInterceptors.intercept(propagateContext);
        ShutdownLatch.ActivityHandle registerActivity = this.shutdownLatch.registerActivity();
        CompletableFuture completableFuture = new CompletableFuture();
        try {
            int determinePriority = this.priorityCalculator.determinePriority(queryMessage2);
            ResultStream<QueryResponse> sendRequest = sendRequest(queryMessage2, serialize(queryMessage2, false, determinePriority));
            Runnable wrapRunnable = this.spanFactory.createInternalSpan(() -> {
                return "AxonServerQueryBus.ResponseProcessingTask";
            }).wrapRunnable(new ResponseProcessingTask(sendRequest, this.serializer, completableFuture, queryMessage.getResponseType()));
            sendRequest.onAvailable(() -> {
                this.queryExecutor.execute(new PriorityRunnable(wrapRunnable, determinePriority, TASK_SEQUENCE.incrementAndGet()));
            });
        } catch (Exception e) {
            logger.debug("There was a problem issuing a query {}.", queryMessage2, e);
            completableFuture.completeExceptionally(ErrorCode.QUERY_DISPATCH_ERROR.convert(this.configuration.getClientId(), e));
            start.recordException(e).end();
        }
        return completableFuture.whenComplete((queryResponseMessage, th) -> {
            registerActivity.end();
            if (th != null) {
                start.recordException(th);
            }
            if (queryResponseMessage != null && queryResponseMessage.isExceptional()) {
                start.recordException(queryResponseMessage.exceptionResult());
            }
            start.end();
        });
    }

    private QueryRequest serializeStreaming(QueryMessage<?, ?> queryMessage, int i) {
        return serialize(queryMessage, true, i);
    }

    private ShutdownLatch.ActivityHandle registerStreamingQueryActivity() {
        this.shutdownLatch.ifShuttingDown("Cannot dispatch new queries as this bus is being shut down");
        return this.shutdownLatch.registerActivity();
    }

    private QueryRequest serialize(QueryMessage<?, ?> queryMessage, boolean z, int i) {
        return this.serializer.serializeRequest(queryMessage, DIRECT_QUERY_NUMBER_OF_RESULTS, DIRECT_QUERY_TIMEOUT_MS, i, z);
    }

    private ResultStream<QueryResponse> sendRequest(QueryMessage<?, ?> queryMessage, QueryRequest queryRequest) {
        return this.axonServerConnectionManager.getConnection(this.targetContextResolver.resolveContext(queryMessage)).queryChannel().query(queryRequest);
    }

    private <R> Publisher<QueryResponseMessage<R>> deserialize(StreamingQueryMessage<?, R> streamingQueryMessage, QueryResponse queryResponse) {
        Class expectedResponseType = streamingQueryMessage.getResponseType().getExpectedResponseType();
        QueryResponseMessage<?> deserializeResponse = this.serializer.deserializeResponse(queryResponse);
        return deserializeResponse.isExceptional() ? Flux.error(deserializeResponse.exceptionResult()) : expectedResponseType.isAssignableFrom(deserializeResponse.getPayloadType()) ? Flux.just(new ConvertingResponseMessage(new InstanceResponseType(expectedResponseType), deserializeResponse)) : Flux.fromStream(((List) new ConvertingResponseMessage(new MultipleInstancesResponseType(expectedResponseType), deserializeResponse).getPayload()).stream().map(obj -> {
            return singleMessage(deserializeResponse, obj, expectedResponseType);
        }));
    }

    private <R> QueryResponseMessage<R> singleMessage(QueryResponseMessage<?> queryResponseMessage, R r, Class<R> cls) {
        return new GenericQueryResponseMessage(new GenericMessage(queryResponseMessage.getIdentifier(), cls, r, queryResponseMessage.getMetaData()));
    }

    public <Q, R> Stream<QueryResponseMessage<R>> scatterGather(@Nonnull QueryMessage<Q, R> queryMessage, long j, @Nonnull TimeUnit timeUnit) {
        Span start = this.spanFactory.createDispatchSpan(() -> {
            return "AxonServerQueryBus.scatterGather";
        }, queryMessage, new Message[0]).start();
        Assert.isFalse(Publisher.class.isAssignableFrom(queryMessage.getResponseType().getExpectedResponseType()), () -> {
            return "The scatter-Gather query does not support Flux as a return type.";
        });
        this.shutdownLatch.ifShuttingDown(String.format("Cannot dispatch new %s as this bus is being shut down", "scatter-gather queries"));
        QueryMessage<Q, R> intercept = this.dispatchInterceptors.intercept(this.spanFactory.propagateContext(queryMessage));
        ShutdownLatch.ActivityHandle registerActivity = this.shutdownLatch.registerActivity();
        try {
            return (Stream) StreamSupport.stream(new QueryResponseSpliterator(queryMessage, this.axonServerConnectionManager.getConnection(this.targetContextResolver.resolveContext(intercept)).queryChannel().query(this.serializer.serializeRequest(intercept, SCATTER_GATHER_NUMBER_OF_RESULTS, timeUnit.toMillis(j), this.priorityCalculator.determinePriority(intercept))), System.currentTimeMillis() + timeUnit.toMillis(j), this.serializer, () -> {
                registerActivity.end();
                start.end();
            }), false).onClose(() -> {
                registerActivity.end();
                start.end();
            });
        } catch (Exception e) {
            logger.debug("There was a problem issuing a scatter-gather query {}.", intercept, e);
            registerActivity.end();
            start.recordException(e).end();
            throw e;
        }
    }

    @Deprecated
    public <Q, I, U> SubscriptionQueryResult<QueryResponseMessage<I>, SubscriptionQueryUpdateMessage<U>> subscriptionQuery(@Nonnull SubscriptionQueryMessage<Q, I, U> subscriptionQueryMessage, SubscriptionQueryBackpressure subscriptionQueryBackpressure, int i) {
        return subscriptionQuery(subscriptionQueryMessage, i);
    }

    public <Q, I, U> SubscriptionQueryResult<QueryResponseMessage<I>, SubscriptionQueryUpdateMessage<U>> subscriptionQuery(@Nonnull SubscriptionQueryMessage<Q, I, U> subscriptionQueryMessage, int i) {
        Assert.isFalse(Publisher.class.isAssignableFrom(subscriptionQueryMessage.getResponseType().getExpectedResponseType()), () -> {
            return "The subscription Query query does not support Flux as a return type.";
        });
        Assert.isFalse(Publisher.class.isAssignableFrom(subscriptionQueryMessage.getUpdateResponseType().getExpectedResponseType()), () -> {
            return "The subscription Query query does not support Flux as an update type.";
        });
        this.shutdownLatch.ifShuttingDown(String.format("Cannot dispatch new %s as this bus is being shut down", "subscription queries"));
        SubscriptionQueryMessage<?, ?, ?> intercept = this.dispatchInterceptors.intercept(this.spanFactory.propagateContext(subscriptionQueryMessage));
        String identifier = intercept.getIdentifier();
        String resolveContext = this.targetContextResolver.resolveContext(intercept);
        logger.debug("Subscription Query requested with subscription Id [{}]", identifier);
        return new AxonServerSubscriptionQueryResult(this.axonServerConnectionManager.getConnection(resolveContext).queryChannel().subscriptionQuery(this.subscriptionSerializer.serializeQuery(intercept), this.subscriptionSerializer.serializeUpdateType(intercept), this.configuration.getQueryFlowControl().getInitialNrOfPermits().intValue(), this.configuration.getQueryFlowControl().getNrOfNewPermits().intValue()), this.subscriptionSerializer);
    }

    public QueryUpdateEmitter queryUpdateEmitter() {
        return this.updateEmitter;
    }

    /* renamed from: localSegment, reason: merged with bridge method [inline-methods] */
    public QueryBus m28localSegment() {
        return this.localSegment;
    }

    public org.axonframework.common.Registration registerHandlerInterceptor(@Nonnull MessageHandlerInterceptor<? super QueryMessage<?, ?>> messageHandlerInterceptor) {
        return this.localSegment.registerHandlerInterceptor(messageHandlerInterceptor);
    }

    @Nonnull
    public org.axonframework.common.Registration registerDispatchInterceptor(@Nonnull MessageDispatchInterceptor<? super QueryMessage<?, ?>> messageDispatchInterceptor) {
        return this.dispatchInterceptors.registerDispatchInterceptor(messageDispatchInterceptor);
    }

    public void disconnect() {
        if (this.axonServerConnectionManager.isConnected(this.context)) {
            this.axonServerConnectionManager.getConnection(this.context).queryChannel().prepareDisconnect();
        }
        this.localSegmentAdapter.cancel();
    }

    public CompletableFuture<Void> shutdownDispatching() {
        return this.shutdownLatch.initiateShutdown();
    }
}
