/*
 * Decompiled with CFR 0.152.
 */
package io.github.a2ap.core.server.impl;

import io.github.a2ap.core.model.AgentCard;
import io.github.a2ap.core.model.MessageSendParams;
import io.github.a2ap.core.model.RequestContext;
import io.github.a2ap.core.model.SendMessageResponse;
import io.github.a2ap.core.model.SendStreamingMessageResponse;
import io.github.a2ap.core.model.Task;
import io.github.a2ap.core.model.TaskArtifactUpdateEvent;
import io.github.a2ap.core.model.TaskPushNotificationConfig;
import io.github.a2ap.core.model.TaskState;
import io.github.a2ap.core.model.TaskStatus;
import io.github.a2ap.core.model.TaskStatusUpdateEvent;
import io.github.a2ap.core.server.A2AServer;
import io.github.a2ap.core.server.AgentExecutor;
import io.github.a2ap.core.server.EventQueue;
import io.github.a2ap.core.server.QueueManager;
import io.github.a2ap.core.server.TaskManager;
import java.time.Instant;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class DefaultA2AServer
implements A2AServer {
    private static final Logger log = LoggerFactory.getLogger(DefaultA2AServer.class);
    private final TaskManager taskManager;
    private final AgentExecutor agentExecutor;
    private final QueueManager queueManager;
    private final AgentCard a2aServerSelfCard;

    public DefaultA2AServer(TaskManager taskManager, AgentExecutor agentExecutor, QueueManager queueManager, AgentCard a2aServerSelfCard) {
        this.taskManager = taskManager;
        this.agentExecutor = agentExecutor;
        this.queueManager = queueManager;
        log.info("A2AServerImpl initialized with TaskManager: {}, AgentExecutor: {}, QueueManager: {}", new Object[]{taskManager.getClass().getSimpleName(), agentExecutor.getClass().getSimpleName(), queueManager.getClass().getSimpleName()});
        this.a2aServerSelfCard = a2aServerSelfCard;
    }

    @Override
    public SendMessageResponse handleMessage(MessageSendParams params) {
        log.info("Attempting to handle the message: {}", (Object)params);
        if (params == null || params.getMessage() == null || params.getMessage().getParts() == null || params.getMessage().getParts().isEmpty()) {
            log.error("Task handle failed: Task params must have at least one message.");
            throw new IllegalArgumentException("Task params must have at least one message");
        }
        RequestContext taskContext = this.taskManager.loadOrCreateContext(params);
        Task currentTask = taskContext.getTask();
        log.info("Task request context loaded: {}", (Object)taskContext.getTask());
        EventQueue eventQueue = this.queueManager.create(taskContext.getTaskId());
        eventQueue.enqueueEvent(currentTask);
        Mono resultMono = this.agentExecutor.execute(taskContext, eventQueue).then(eventQueue.asFlux().flatMap(event -> {
            if (event instanceof TaskStatusUpdateEvent) {
                return this.taskManager.applyStatusUpdate(currentTask, (TaskStatusUpdateEvent)event);
            }
            if (event instanceof TaskArtifactUpdateEvent) {
                return this.taskManager.applyArtifactUpdate(currentTask, (TaskArtifactUpdateEvent)event);
            }
            return Mono.just((Object)event);
        }).filter(event -> !(event instanceof Task)).cast(SendMessageResponse.class).next().doOnError(e -> log.error("Error in task {} updates stream via handleMessage: {}", new Object[]{taskContext.getTaskId(), e.getMessage(), e})).doOnTerminate(() -> {
            log.debug("Agent execution completed for task: {}", (Object)taskContext.getTaskId());
            this.queueManager.remove(taskContext.getTaskId());
        }));
        SendMessageResponse response = (SendMessageResponse)resultMono.block();
        response = response == null ? currentTask : response;
        log.info("Handle message success: {}", (Object)response);
        return response;
    }

    @Override
    public Flux<SendStreamingMessageResponse> handleMessageStream(MessageSendParams params) {
        log.info("Attempting to handle the streaming message: {}", (Object)params);
        if (params == null || params.getMessage() == null || params.getMessage().getParts() == null || params.getMessage().getParts().isEmpty()) {
            log.error("Streaming handle failed: Task params must have at least one message.");
            throw new IllegalArgumentException("Task params must have at least one message");
        }
        RequestContext taskContext = this.taskManager.loadOrCreateContext(params);
        Task currentTask = taskContext.getTask();
        log.info("Task request context loaded: {}", (Object)taskContext.getTask());
        EventQueue eventQueue = this.queueManager.create(taskContext.getTaskId());
        return this.agentExecutor.execute(taskContext, eventQueue).thenMany((Publisher)eventQueue.asFlux().doOnNext(event -> {
            if (event instanceof TaskStatusUpdateEvent) {
                this.taskManager.applyStatusUpdate(currentTask, (TaskStatusUpdateEvent)event).block();
            } else if (event instanceof TaskArtifactUpdateEvent) {
                this.taskManager.applyArtifactUpdate(currentTask, (TaskArtifactUpdateEvent)event).block();
            }
        }).doOnComplete(() -> log.debug("Task {} updates stream completed via handleMessageStream.", (Object)taskContext.getTaskId())).doOnError(e -> log.error("Error in task {} updates stream via handleMessageStream: {}", new Object[]{taskContext.getTaskId(), e.getMessage(), e})).doOnTerminate(() -> {
            log.debug("Agent execution completed for task: {}", (Object)taskContext.getTaskId());
            this.queueManager.remove(taskContext.getTaskId());
        }));
    }

    @Override
    public Task getTask(String taskId) {
        log.info("Getting task with ID: {}", (Object)taskId);
        Task task = this.taskManager.getTask(taskId);
        if (task != null) {
            log.debug("Found task {}: {}", (Object)taskId, (Object)task);
        } else {
            log.warn("Task with ID {} not found.", (Object)taskId);
        }
        return task;
    }

    @Override
    public Task cancelTask(String taskId) {
        if (taskId == null) {
            throw new IllegalArgumentException("Cancel Task id must not be null");
        }
        log.info("Attempting to cancel task with ID: {}", (Object)taskId);
        Task cancelledTask = this.taskManager.getTask(taskId);
        if (cancelledTask == null) {
            log.warn("Task with ID {} not found for cancellation.", (Object)taskId);
            throw new IllegalArgumentException("Cancel Task id not found for cancellation.");
        }
        EventQueue eventQueue = this.queueManager.get(taskId);
        TaskStatus taskStatus = TaskStatus.builder().state(TaskState.CANCELED).timestamp(String.valueOf(Instant.now().toEpochMilli())).build();
        if (eventQueue != null) {
            TaskStatusUpdateEvent event = TaskStatusUpdateEvent.builder().taskId(taskId).status(taskStatus).isFinal(true).build();
            eventQueue.enqueueEvent(event);
            eventQueue.close();
        }
        this.agentExecutor.cancel(taskId).block();
        this.taskManager.applyTaskUpdate(cancelledTask, taskStatus).block();
        log.info("Task {} cancelled successfully.", (Object)taskId);
        return cancelledTask;
    }

    @Override
    public TaskPushNotificationConfig setTaskPushNotification(TaskPushNotificationConfig config) {
        log.info("Attempting to set push notification config for task: {}", (Object)(config != null ? config.getTaskId() : "null"));
        if (config == null || config.getTaskId() == null || config.getTaskId().isEmpty()) {
            log.warn("Failed to set push notification config: Invalid config provided.");
            return null;
        }
        this.taskManager.registerTaskNotification(config);
        log.info("Push notification config set for task {}.", (Object)config.getTaskId());
        return config;
    }

    @Override
    public TaskPushNotificationConfig getTaskPushNotification(String taskId) {
        log.info("Getting push notification config for task ID: {}", (Object)taskId);
        TaskPushNotificationConfig config = this.taskManager.getTaskNotification(taskId);
        if (config != null) {
            log.debug("Found push notification config for task {}: {}", (Object)taskId, (Object)config);
        } else {
            log.warn("Push notification config not found for task ID {}.", (Object)taskId);
        }
        return config;
    }

    @Override
    public Flux<SendStreamingMessageResponse> subscribeToTaskUpdates(String taskId) {
        log.info("Subscribing to task updates for ID: {}", (Object)taskId);
        Task task = this.taskManager.getTask(taskId);
        if (task == null) {
            log.warn("Task with ID {} not found for subscription.", (Object)taskId);
            return Flux.error((Throwable)new IllegalArgumentException("Task not found: " + taskId));
        }
        TaskState state = task.getStatus().getState();
        if (state == TaskState.COMPLETED || state == TaskState.FAILED || state == TaskState.CANCELED || state == TaskState.REJECTED) {
            log.info("Task {} is in final state {}, returning final status.", (Object)taskId, (Object)state);
            TaskStatusUpdateEvent finalEvent = TaskStatusUpdateEvent.builder().taskId(taskId).status(task.getStatus()).isFinal(true).build();
            return Flux.just((Object)finalEvent);
        }
        EventQueue eventQueue = this.queueManager.tap(taskId);
        if (eventQueue != null) {
            log.debug("Task {} is in progress, subscribing to updates via tapped queue.", (Object)taskId);
            return eventQueue.asFlux().doOnSubscribe(s -> log.debug("Subscriber attached to task {} updates via subscribeToTaskUpdates.", (Object)taskId)).doOnComplete(() -> log.debug("Task {} updates stream completed via subscribeToTaskUpdates.", (Object)taskId)).doOnError(e -> log.error("Error in task {} updates stream via subscribeToTaskUpdates: {}", new Object[]{taskId, e.getMessage(), e}));
        }
        log.warn("No active event queue found for task {}.", (Object)taskId);
        return Flux.empty();
    }

    @Override
    public AgentCard getSelfAgentCard() {
        log.info("Getting self agent card.");
        return this.a2aServerSelfCard;
    }
}

