/*
 * Decompiled with CFR 0.152.
 */
package org.bsc.langgraph4j.spring.ai.generators;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import org.bsc.async.AsyncGenerator;
import org.bsc.async.FlowGenerator;
import org.bsc.langgraph4j.NodeOutput;
import org.bsc.langgraph4j.state.AgentState;
import org.bsc.langgraph4j.streaming.StreamingOutput;
import org.reactivestreams.FlowAdapters;
import org.reactivestreams.Publisher;
import org.springframework.ai.chat.messages.AssistantMessage;
import org.springframework.ai.chat.model.ChatResponse;
import org.springframework.ai.chat.model.Generation;
import reactor.core.publisher.Flux;

public interface StreamingChatGenerator {
    public static <State extends AgentState> Builder<State> builder() {
        return new Builder();
    }

    public static class Builder<State extends AgentState> {
        private Function<ChatResponse, Map<String, Object>> mapResult;
        private String startingNode;
        private State startingState;

        public Builder<State> mapResult(Function<ChatResponse, Map<String, Object>> mapResult) {
            this.mapResult = mapResult;
            return this;
        }

        public Builder<State> startingNode(String node) {
            this.startingNode = node;
            return this;
        }

        public Builder<State> startingState(State state) {
            this.startingState = state;
            return this;
        }

        public AsyncGenerator<? extends NodeOutput<State>> build(Flux<ChatResponse> flux) {
            Objects.requireNonNull(flux, "flux cannot be null");
            Objects.requireNonNull(this.mapResult, "mapResult cannot be null");
            AtomicReference<Object> result = new AtomicReference<Object>(null);
            Consumer<ChatResponse> mergeMessage = response -> result.updateAndGet(lastResponse -> {
                if (lastResponse == null) {
                    return response;
                }
                AssistantMessage currentMessage = response.getResult().getOutput();
                if (currentMessage.hasToolCalls()) {
                    return response;
                }
                String lastMessageText = Objects.requireNonNull(lastResponse.getResult().getOutput().getText(), "lastResponse text cannot be null");
                String currentMessageText = currentMessage.getText();
                AssistantMessage newMessage = new AssistantMessage(currentMessageText != null ? lastMessageText.concat(currentMessageText) : lastMessageText, currentMessage.getMetadata(), currentMessage.getToolCalls(), currentMessage.getMedia());
                Generation newGeneration = new Generation(newMessage, response.getResult().getMetadata());
                return new ChatResponse(List.of(newGeneration), response.getMetadata());
            });
            Flux processedFlux = flux.filter(response -> response.getResult() != null && response.getResult().getOutput() != null).doOnNext(mergeMessage).map(next -> new StreamingOutput(next.getResult().getOutput().getText(), this.startingNode, this.startingState));
            return FlowGenerator.fromPublisher((Flow.Publisher)FlowAdapters.toFlowPublisher((Publisher)processedFlux), () -> this.mapResult.apply((ChatResponse)result.get()));
        }
    }
}

