/*
 * Decompiled with CFR 0.152.
 */
package tech.powerjob.worker.actors;

import akka.actor.AbstractActor;
import akka.actor.Props;
import akka.japi.Creator;
import com.google.common.collect.Lists;
import java.io.Serializable;
import java.util.LinkedList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.powerjob.common.model.InstanceDetail;
import tech.powerjob.common.request.ServerQueryInstanceStatusReq;
import tech.powerjob.common.request.ServerScheduleJobReq;
import tech.powerjob.common.request.ServerStopInstanceReq;
import tech.powerjob.common.response.AskResponse;
import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.common.constants.TaskStatus;
import tech.powerjob.worker.core.tracker.task.TaskTracker;
import tech.powerjob.worker.core.tracker.task.TaskTrackerPool;
import tech.powerjob.worker.persistence.TaskDO;
import tech.powerjob.worker.pojo.request.ProcessorMapTaskRequest;
import tech.powerjob.worker.pojo.request.ProcessorReportTaskStatusReq;
import tech.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq;

public class TaskTrackerActor
extends AbstractActor {
    private static final Logger log = LoggerFactory.getLogger(TaskTrackerActor.class);
    private final WorkerRuntime workerRuntime;

    public static Props props(WorkerRuntime workerRuntime) {
        return Props.create(TaskTrackerActor.class, (Creator & Serializable)() -> new TaskTrackerActor(workerRuntime));
    }

    public AbstractActor.Receive createReceive() {
        return this.receiveBuilder().match(ProcessorReportTaskStatusReq.class, this::onReceiveProcessorReportTaskStatusReq).match(ServerScheduleJobReq.class, this::onReceiveServerScheduleJobReq).match(ProcessorMapTaskRequest.class, this::onReceiveProcessorMapTaskRequest).match(ProcessorTrackerStatusReportReq.class, this::onReceiveProcessorTrackerStatusReportReq).match(ServerStopInstanceReq.class, this::onReceiveServerStopInstanceReq).match(ServerQueryInstanceStatusReq.class, this::onReceiveServerQueryInstanceStatusReq).matchAny(obj -> log.warn("[ServerRequestActor] receive unknown request: {}.", obj)).build();
    }

    private void onReceiveProcessorReportTaskStatusReq(ProcessorReportTaskStatusReq req) {
        int taskStatus = req.getStatus();
        TaskTracker taskTracker = TaskTrackerPool.getTaskTrackerPool(req.getInstanceId());
        if (TaskStatus.finishedStatus.contains(taskStatus)) {
            AskResponse askResponse = AskResponse.succeed(null);
            this.getSender().tell((Object)askResponse, this.getSelf());
        }
        if (taskTracker == null) {
            log.warn("[TaskTrackerActor] receive ProcessorReportTaskStatusReq({}) but system can't find TaskTracker.", (Object)req);
            return;
        }
        if (ProcessorReportTaskStatusReq.BROADCAST.equals(req.getCmd())) {
            taskTracker.broadcast(taskStatus == TaskStatus.WORKER_PROCESS_SUCCESS.getValue(), req.getSubInstanceId(), req.getTaskId(), req.getResult());
        }
        taskTracker.updateTaskStatus(req.getSubInstanceId(), req.getTaskId(), taskStatus, req.getReportTime(), req.getResult());
        taskTracker.updateAppendedWfContext(req.getAppendedWfContext());
    }

    private void onReceiveProcessorMapTaskRequest(ProcessorMapTaskRequest req) {
        TaskTracker taskTracker = TaskTrackerPool.getTaskTrackerPool(req.getInstanceId());
        if (taskTracker == null) {
            log.warn("[TaskTrackerActor] receive ProcessorMapTaskRequest({}) but system can't find TaskTracker.", (Object)req);
            return;
        }
        boolean success = false;
        LinkedList subTaskList = Lists.newLinkedList();
        try {
            req.getSubTasks().forEach(originSubTask -> {
                TaskDO subTask = new TaskDO();
                subTask.setTaskName(req.getTaskName());
                subTask.setSubInstanceId(req.getSubInstanceId());
                subTask.setTaskId(originSubTask.getTaskId());
                subTask.setTaskContent(originSubTask.getTaskContent());
                subTaskList.add(subTask);
            });
            success = taskTracker.submitTask(subTaskList);
        }
        catch (Exception e) {
            log.warn("[TaskTrackerActor] process map task(instanceId={}) failed.", (Object)req.getInstanceId(), (Object)e);
        }
        AskResponse response = new AskResponse();
        response.setSuccess(success);
        this.getSender().tell((Object)response, this.getSelf());
    }

    private void onReceiveServerScheduleJobReq(ServerScheduleJobReq req) {
        Long instanceId = req.getInstanceId();
        TaskTracker taskTracker = TaskTrackerPool.getTaskTrackerPool(instanceId);
        if (taskTracker != null) {
            log.warn("[TaskTrackerActor] TaskTracker({}) for instance(id={}) already exists.", (Object)taskTracker, (Object)instanceId);
            return;
        }
        log.debug("[TaskTrackerActor] server schedule job by request: {}.", (Object)req);
        TaskTrackerPool.atomicCreateTaskTracker(instanceId, ignore -> TaskTracker.create(req, this.workerRuntime));
    }

    private void onReceiveProcessorTrackerStatusReportReq(ProcessorTrackerStatusReportReq req) {
        TaskTracker taskTracker = TaskTrackerPool.getTaskTrackerPool(req.getInstanceId());
        if (taskTracker == null) {
            log.warn("[TaskTrackerActor] receive ProcessorTrackerStatusReportReq({}) but system can't find TaskTracker.", (Object)req);
            return;
        }
        taskTracker.receiveProcessorTrackerHeartbeat(req);
    }

    private void onReceiveServerStopInstanceReq(ServerStopInstanceReq req) {
        TaskTracker taskTracker = TaskTrackerPool.getTaskTrackerPool(req.getInstanceId());
        if (taskTracker == null) {
            log.warn("[TaskTrackerActor] receive ServerStopInstanceReq({}) but system can't find TaskTracker.", (Object)req);
            return;
        }
        taskTracker.destroy();
    }

    private void onReceiveServerQueryInstanceStatusReq(ServerQueryInstanceStatusReq req) {
        AskResponse askResponse;
        TaskTracker taskTracker = TaskTrackerPool.getTaskTrackerPool(req.getInstanceId());
        if (taskTracker == null) {
            log.warn("[TaskTrackerActor] receive ServerQueryInstanceStatusReq({}) but system can't find TaskTracker.", (Object)req);
            askResponse = AskResponse.failed((String)"can't find TaskTracker");
        } else {
            InstanceDetail instanceDetail = taskTracker.fetchRunningStatus();
            askResponse = AskResponse.succeed((Object)instanceDetail);
        }
        this.getSender().tell((Object)askResponse, this.getSelf());
    }

    public TaskTrackerActor(WorkerRuntime workerRuntime) {
        this.workerRuntime = workerRuntime;
    }
}

