/*
 * Decompiled with CFR 0.152.
 */
package io.a2a.server.tasks;

import io.a2a.server.events.EventConsumer;
import io.a2a.server.tasks.TaskManager;
import io.a2a.server.util.async.AsyncUtils;
import io.a2a.spec.A2AServerException;
import io.a2a.spec.Event;
import io.a2a.spec.EventKind;
import io.a2a.spec.JSONRPCError;
import io.a2a.spec.Message;
import io.a2a.spec.Task;
import io.a2a.spec.TaskState;
import io.a2a.spec.TaskStatusUpdateEvent;
import io.a2a.util.Utils;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

public class ResultAggregator {
    private final TaskManager taskManager;
    private volatile Message message;

    public ResultAggregator(TaskManager taskManager, Message message) {
        this.taskManager = taskManager;
        this.message = message;
    }

    public EventKind getCurrentResult() {
        if (this.message != null) {
            return this.message;
        }
        return this.taskManager.getTask();
    }

    public Flow.Publisher<Event> consumeAndEmit(EventConsumer consumer) {
        Flow.Publisher<Event> all = consumer.consumeAll();
        return AsyncUtils.processor(AsyncUtils.createTubeConfig(), all, (errorConsumer, event) -> {
            this.callTaskManagerProcess((Event)event);
            return true;
        });
    }

    public EventKind consumeAll(EventConsumer consumer) {
        AtomicReference returnedEvent = new AtomicReference();
        Flow.Publisher<Event> all = consumer.consumeAll();
        AtomicReference error = new AtomicReference();
        AsyncUtils.consumer(AsyncUtils.createTubeConfig(), all, event -> {
            if (event instanceof Message) {
                Message msg;
                this.message = msg = (Message)event;
                if (returnedEvent.get() == null) {
                    returnedEvent.set(msg);
                    return false;
                }
            }
            this.callTaskManagerProcess((Event)event);
            return true;
        }, error::set);
        if (returnedEvent.get() != null) {
            return (EventKind)returnedEvent.get();
        }
        return this.taskManager.getTask();
    }

    public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer) throws JSONRPCError {
        Flow.Publisher<Event> all = consumer.consumeAll();
        AtomicReference message = new AtomicReference();
        AtomicBoolean interrupted = new AtomicBoolean(false);
        AtomicReference errorRef = new AtomicReference();
        AsyncUtils.consumer(AsyncUtils.createTubeConfig(), all, event -> {
            TaskStatusUpdateEvent tsue;
            Task task;
            if (event instanceof Throwable) {
                Throwable t = (Throwable)event;
                errorRef.set(t);
                return false;
            }
            if (event instanceof Message) {
                Message msg;
                this.message = msg = (Message)event;
                message.set(msg);
                return false;
            }
            this.callTaskManagerProcess((Event)event);
            if (event instanceof Task && (task = (Task)event).getStatus().state() == TaskState.AUTH_REQUIRED || event instanceof TaskStatusUpdateEvent && (tsue = (TaskStatusUpdateEvent)event).getStatus().state() == TaskState.AUTH_REQUIRED) {
                this.continueConsuming(all);
                interrupted.set(true);
                return false;
            }
            return true;
        }, errorRef::set);
        Throwable error = (Throwable)errorRef.get();
        if (error != null) {
            Utils.rethrow((Throwable)error);
        }
        return new EventTypeAndInterrupt((EventKind)(message.get() != null ? (EventKind)message.get() : this.taskManager.getTask()), interrupted.get());
    }

    private void continueConsuming(Flow.Publisher<Event> all) {
        AsyncUtils.consumer(AsyncUtils.createTubeConfig(), all, event -> {
            this.callTaskManagerProcess((Event)event);
            return true;
        }, t -> {});
    }

    private void callTaskManagerProcess(Event event) {
        try {
            this.taskManager.process(event);
        }
        catch (A2AServerException e) {
            e.printStackTrace();
        }
    }

    public record EventTypeAndInterrupt(EventKind eventType, boolean interrupted) {
    }
}

