/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.cloud.ai.a2a.server;

import com.alibaba.cloud.ai.a2a.server.A2aRequestHandler;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import io.a2a.server.requesthandlers.JSONRPCHandler;
import io.a2a.spec.AgentCard;
import io.a2a.spec.CancelTaskRequest;
import io.a2a.spec.DeleteTaskPushNotificationConfigRequest;
import io.a2a.spec.GetTaskPushNotificationConfigRequest;
import io.a2a.spec.GetTaskRequest;
import io.a2a.spec.JSONParseError;
import io.a2a.spec.JSONRPCError;
import io.a2a.spec.JSONRPCErrorResponse;
import io.a2a.spec.JSONRPCRequest;
import io.a2a.spec.JSONRPCResponse;
import io.a2a.spec.ListTaskPushNotificationConfigRequest;
import io.a2a.spec.MessageSendParams;
import io.a2a.spec.NonStreamingJSONRPCRequest;
import io.a2a.spec.SendMessageRequest;
import io.a2a.spec.SendStreamingMessageRequest;
import io.a2a.spec.SetTaskPushNotificationConfigRequest;
import io.a2a.spec.StreamingJSONRPCRequest;
import io.a2a.spec.TaskResubscriptionRequest;
import io.a2a.spec.UnsupportedOperationError;
import io.a2a.util.Utils;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.Flow;
import org.reactivestreams.FlowAdapters;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.servlet.function.ServerRequest;
import reactor.core.publisher.Flux;

public class JsonRpcA2aRequestHandler
implements A2aRequestHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(JsonRpcA2aRequestHandler.class);
    private final JSONRPCHandler jsonRpcHandler;

    public JsonRpcA2aRequestHandler(JSONRPCHandler jsonRpcHandler) {
        this.jsonRpcHandler = jsonRpcHandler;
    }

    @Override
    public AgentCard getAgentCard() {
        return this.jsonRpcHandler.getAgentCard();
    }

    @Override
    public Object onHandler(String body, ServerRequest.Headers headers) {
        boolean streaming = JsonRpcA2aRequestHandler.isStreamingRequest(body);
        JSONRPCErrorResponse result = null;
        try {
            result = streaming ? this.handleStreamRequest(body) : this.handleNonStreamRequest(body);
        }
        catch (JsonProcessingException e) {
            result = new JSONRPCErrorResponse(null, (JSONRPCError)new JSONParseError());
        }
        return result;
    }

    private static boolean isStreamingRequest(String requestBody) {
        try {
            JsonNode node = Utils.OBJECT_MAPPER.readTree(requestBody);
            JsonNode method = node != null ? node.get("method") : null;
            return method != null && ("message/stream".equals(method.asText()) || "tasks/resubscribe".equals(method.asText()));
        }
        catch (Exception e) {
            return false;
        }
    }

    private Flux<?> handleStreamRequest(String body) throws JsonProcessingException {
        Flow.Publisher publisher;
        StreamingJSONRPCRequest request = (StreamingJSONRPCRequest)Utils.OBJECT_MAPPER.readValue(body, StreamingJSONRPCRequest.class);
        if (request instanceof SendStreamingMessageRequest) {
            SendStreamingMessageRequest req = (SendStreamingMessageRequest)request;
            SendStreamingMessageRequest.Builder newReqBuilder = new SendStreamingMessageRequest.Builder().id(req.getId()).jsonrpc(req.getJsonrpc()).method(req.getMethod()).params(this.injectStreamMetadata((MessageSendParams)req.getParams(), true));
            publisher = this.jsonRpcHandler.onMessageSendStream(newReqBuilder.build());
            LOGGER.info("get Stream publisher {}", (Object)publisher);
        } else if (request instanceof TaskResubscriptionRequest) {
            TaskResubscriptionRequest req = (TaskResubscriptionRequest)request;
            publisher = this.jsonRpcHandler.onResubscribeToTask(req);
        } else {
            return Flux.just((Object)JsonRpcA2aRequestHandler.generateErrorResponse(request, (JSONRPCError)new UnsupportedOperationError()));
        }
        return Flux.from((Publisher)FlowAdapters.toPublisher((Flow.Publisher)publisher)).map(jsonrpcResponse -> jsonrpcResponse).delaySubscription(Duration.ofMillis(10L));
    }

    private JSONRPCResponse<?> handleNonStreamRequest(String body) throws JsonProcessingException {
        NonStreamingJSONRPCRequest request = (NonStreamingJSONRPCRequest)Utils.OBJECT_MAPPER.readValue(body, NonStreamingJSONRPCRequest.class);
        if (request instanceof GetTaskRequest) {
            GetTaskRequest req = (GetTaskRequest)request;
            return this.jsonRpcHandler.onGetTask(req);
        }
        if (request instanceof SendMessageRequest) {
            SendMessageRequest req = (SendMessageRequest)request;
            SendMessageRequest.Builder newReqBuilder = new SendMessageRequest.Builder().id(req.getId()).jsonrpc(req.getJsonrpc()).method(req.getMethod()).params(this.injectStreamMetadata((MessageSendParams)req.getParams(), false));
            return this.jsonRpcHandler.onMessageSend(newReqBuilder.build());
        }
        if (request instanceof CancelTaskRequest) {
            CancelTaskRequest req = (CancelTaskRequest)request;
            return this.jsonRpcHandler.onCancelTask(req);
        }
        if (request instanceof GetTaskPushNotificationConfigRequest) {
            GetTaskPushNotificationConfigRequest req = (GetTaskPushNotificationConfigRequest)request;
            return this.jsonRpcHandler.getPushNotificationConfig(req);
        }
        if (request instanceof SetTaskPushNotificationConfigRequest) {
            SetTaskPushNotificationConfigRequest req = (SetTaskPushNotificationConfigRequest)request;
            return this.jsonRpcHandler.setPushNotificationConfig(req);
        }
        if (request instanceof ListTaskPushNotificationConfigRequest) {
            ListTaskPushNotificationConfigRequest req = (ListTaskPushNotificationConfigRequest)request;
            return this.jsonRpcHandler.listPushNotificationConfig(req);
        }
        if (request instanceof DeleteTaskPushNotificationConfigRequest) {
            DeleteTaskPushNotificationConfigRequest req = (DeleteTaskPushNotificationConfigRequest)request;
            return this.jsonRpcHandler.deletePushNotificationConfig(req);
        }
        return JsonRpcA2aRequestHandler.generateErrorResponse(request, (JSONRPCError)new UnsupportedOperationError());
    }

    private static JSONRPCErrorResponse generateErrorResponse(JSONRPCRequest<?> request, JSONRPCError error) {
        return new JSONRPCErrorResponse(request.getId(), error);
    }

    private MessageSendParams injectStreamMetadata(MessageSendParams original, boolean isStreaming) {
        if (null == original.metadata()) {
            MessageSendParams.Builder newBuilder = new MessageSendParams.Builder();
            newBuilder.configuration(original.configuration());
            newBuilder.metadata(Map.of("isStreaming", isStreaming));
            newBuilder.message(original.message());
            return newBuilder.build();
        }
        original.metadata().put("isStreaming", isStreaming);
        return original;
    }
}

