/*
 * Decompiled with CFR 0.152.
 */
package io.quarkiverse.langchain4j.runtime.aiservice;

import dev.langchain4j.agent.tool.ToolSpecification;
import dev.langchain4j.data.message.ChatMessage;
import dev.langchain4j.model.chat.response.ChatResponse;
import dev.langchain4j.rag.content.Content;
import dev.langchain4j.service.TokenStream;
import dev.langchain4j.service.tool.ToolExecution;
import dev.langchain4j.service.tool.ToolExecutor;
import io.quarkiverse.langchain4j.runtime.aiservice.ChatEvent;
import io.quarkiverse.langchain4j.runtime.aiservice.QuarkusAiServiceContext;
import io.quarkiverse.langchain4j.runtime.aiservice.QuarkusAiServiceTokenStream;
import io.smallrye.common.vertx.VertxContext;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.operators.AbstractMulti;
import io.smallrye.mutiny.operators.multi.processors.UnicastProcessor;
import io.smallrye.mutiny.subscription.MultiSubscriber;
import io.vertx.core.Context;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;

class TokenStreamMulti
extends AbstractMulti<ChatEvent>
implements Multi<ChatEvent> {
    private final List<ChatMessage> messagesToSend;
    private final List<ToolSpecification> toolSpecifications;
    private final Map<String, ToolExecutor> toolsExecutors;
    private final List<Content> contents;
    private final QuarkusAiServiceContext context;
    private final Object memoryId;
    private final boolean switchToWorkerThreadForToolExecution;
    private final boolean isCallerRunningOnWorkerThread;

    TokenStreamMulti(List<ChatMessage> messagesToSend, List<ToolSpecification> toolSpecifications, Map<String, ToolExecutor> toolExecutors, List<Content> contents, QuarkusAiServiceContext context, Object memoryId, boolean switchToWorkerThreadForToolExecution, boolean isCallerRunningOnWorkerThread) {
        this.messagesToSend = messagesToSend;
        this.toolSpecifications = toolSpecifications;
        this.toolsExecutors = toolExecutors;
        this.contents = contents;
        this.context = context;
        this.memoryId = memoryId;
        this.switchToWorkerThreadForToolExecution = switchToWorkerThreadForToolExecution;
        this.isCallerRunningOnWorkerThread = isCallerRunningOnWorkerThread;
    }

    public void subscribe(MultiSubscriber<? super ChatEvent> subscriber) {
        UnicastProcessor processor = UnicastProcessor.create();
        processor.subscribe(subscriber);
        this.createTokenStream((UnicastProcessor<ChatEvent>)processor);
    }

    private void createTokenStream(UnicastProcessor<ChatEvent> processor) {
        Context ctxt = null;
        if (this.switchToWorkerThreadForToolExecution || this.isCallerRunningOnWorkerThread) {
            ctxt = VertxContext.getOrCreateDuplicatedContext();
        }
        QuarkusAiServiceTokenStream stream = new QuarkusAiServiceTokenStream(this.messagesToSend, this.toolSpecifications, this.toolsExecutors, this.contents, this.context, this.memoryId, ctxt, this.switchToWorkerThreadForToolExecution, this.isCallerRunningOnWorkerThread);
        final TokenStream tokenStream = stream.onPartialResponse(chunk -> processor.onNext((Object)new ChatEvent.PartialResponseEvent((String)chunk))).onPartialThinking(thinking -> processor.onNext((Object)new ChatEvent.PartialThinkingEvent(thinking.text()))).onCompleteResponse(message -> {
            processor.onNext((Object)new ChatEvent.ChatCompletedEvent((ChatResponse)message));
            processor.onComplete();
        }).onRetrieved(content -> processor.onNext((Object)new ChatEvent.ContentFetchedEvent((List<Content>)content))).beforeToolExecution(beforeExecution -> processor.onNext((Object)new ChatEvent.BeforeToolExecutionEvent(beforeExecution.request()))).onToolExecuted(execution -> processor.onNext((Object)new ChatEvent.ToolExecutedEvent((ToolExecution)execution))).onError(arg_0 -> processor.onError(arg_0));
        if (this.switchToWorkerThreadForToolExecution && Context.isOnEventLoopThread()) {
            ctxt.executeBlocking((Callable)new Callable<Void>(){
                final /* synthetic */ TokenStreamMulti this$0;
                {
                    this.this$0 = this$0;
                }

                @Override
                public Void call() {
                    tokenStream.start();
                    return null;
                }
            });
        } else {
            tokenStream.start();
        }
    }
}

