/*
 * Decompiled with CFR 0.152.
 */
package tech.powerjob.worker.core.tracker.task;

import akka.actor.ActorSelection;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Stopwatch;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.model.InstanceDetail;
import tech.powerjob.common.request.ServerScheduleJobReq;
import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq;
import tech.powerjob.common.request.WorkerQueryExecutorClusterReq;
import tech.powerjob.common.response.AskResponse;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.common.utils.SegmentLock;
import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.common.constants.TaskStatus;
import tech.powerjob.worker.common.utils.AkkaUtils;
import tech.powerjob.worker.common.utils.WorkflowContextUtils;
import tech.powerjob.worker.core.ha.ProcessorTrackerStatusHolder;
import tech.powerjob.worker.core.tracker.task.CommonTaskTracker;
import tech.powerjob.worker.core.tracker.task.FrequentTaskTracker;
import tech.powerjob.worker.core.tracker.task.TaskTrackerPool;
import tech.powerjob.worker.persistence.TaskDO;
import tech.powerjob.worker.persistence.TaskPersistenceService;
import tech.powerjob.worker.pojo.model.InstanceInfo;
import tech.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq;
import tech.powerjob.worker.pojo.request.TaskTrackerStartTaskReq;
import tech.powerjob.worker.pojo.request.TaskTrackerStopInstanceReq;

public abstract class TaskTracker {
    private static final Logger log = LoggerFactory.getLogger(TaskTracker.class);
    protected final long createTime = System.currentTimeMillis();
    protected final WorkerRuntime workerRuntime;
    protected final long instanceId;
    protected final InstanceInfo instanceInfo;
    protected final ProcessorTrackerStatusHolder ptStatusHolder;
    protected final TaskPersistenceService taskPersistenceService;
    protected ScheduledExecutorService scheduledPool;
    protected final AtomicBoolean finished;
    protected final Map<String, String> appendedWfContext;
    private final Cache<String, TaskBriefInfo> taskId2BriefInfo;
    private final SegmentLock segmentLock;
    private static final int UPDATE_CONCURRENCY = 4;

    protected TaskTracker(ServerScheduleJobReq req, WorkerRuntime workerRuntime) {
        this.workerRuntime = workerRuntime;
        this.instanceId = req.getInstanceId();
        this.instanceInfo = new InstanceInfo();
        BeanUtils.copyProperties((Object)req, (Object)this.instanceInfo);
        if (this.instanceInfo.getInstanceTimeoutMS() <= 0L) {
            this.instanceInfo.setInstanceTimeoutMS(Integer.MAX_VALUE);
        }
        this.instanceInfo.setTimeExpressionType(TimeExpressionType.valueOf((String)req.getTimeExpressionType()).getV());
        this.instanceInfo.setThreadConcurrency(Math.max(1, this.instanceInfo.getThreadConcurrency()));
        this.ptStatusHolder = new ProcessorTrackerStatusHolder(this.instanceId, req.getMaxWorkerCount(), req.getAllWorkerAddress());
        this.taskPersistenceService = workerRuntime.getTaskPersistenceService();
        this.finished = new AtomicBoolean(false);
        this.appendedWfContext = req.getWfInstanceId() == null ? Collections.emptyMap() : Maps.newConcurrentMap();
        this.taskId2BriefInfo = CacheBuilder.newBuilder().maximumSize(1024L).build();
        this.segmentLock = new SegmentLock(4);
        this.initTaskTracker(req);
        log.info("[TaskTracker-{}] create TaskTracker successfully.", (Object)this.instanceId);
    }

    public static TaskTracker create(ServerScheduleJobReq req, WorkerRuntime workerRuntime) {
        try {
            TimeExpressionType timeExpressionType = TimeExpressionType.valueOf((String)req.getTimeExpressionType());
            switch (timeExpressionType) {
                case FIXED_RATE: 
                case FIXED_DELAY: {
                    return new FrequentTaskTracker(req, workerRuntime);
                }
            }
            return new CommonTaskTracker(req, workerRuntime);
        }
        catch (Exception e) {
            log.warn("[TaskTracker-{}] create TaskTracker from request({}) failed.", new Object[]{req.getInstanceId(), req, e});
            TaskTrackerReportInstanceStatusReq response = new TaskTrackerReportInstanceStatusReq();
            BeanUtils.copyProperties((Object)req, (Object)response);
            response.setInstanceStatus(InstanceStatus.FAILED.getV());
            response.setResult(String.format("init TaskTracker failed, reason: %s", e.toString()));
            response.setReportTime(System.currentTimeMillis());
            response.setStartTime(System.currentTimeMillis());
            response.setSourceAddress(workerRuntime.getWorkerAddress());
            String serverPath = AkkaUtils.getServerActorPath(workerRuntime.getServerDiscoveryService().getCurrentServerAddress());
            ActorSelection serverActor = workerRuntime.getActorSystem().actorSelection(serverPath);
            serverActor.tell((Object)response, null);
            return null;
        }
    }

    public void updateAppendedWfContext(Map<String, String> newAppendedWfContext) {
        if (this.instanceInfo.getWfInstanceId() == null || CollectionUtils.isEmpty(newAppendedWfContext)) {
            return;
        }
        if (WorkflowContextUtils.isExceededLengthLimit(this.appendedWfContext, this.workerRuntime.getWorkerConfig().getMaxAppendedWfContextLength())) {
            log.warn("[TaskTracker-{}]current length of appended workflow context data is greater than {}, this appended workflow context data will be ignore!", (Object)this.instanceInfo.getInstanceId(), (Object)this.workerRuntime.getWorkerConfig().getMaxAppendedWfContextLength());
            return;
        }
        for (Map.Entry<String, String> entry : newAppendedWfContext.entrySet()) {
            String originValue = this.appendedWfContext.put(entry.getKey(), entry.getValue());
            log.info("[TaskTracker-{}] update appended workflow context data {} : {} -> {}", new Object[]{this.instanceInfo.getInstanceId(), entry.getKey(), originValue, entry.getValue()});
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void updateTaskStatus(Long subInstanceId, String taskId, int newStatus, long reportTime, @Nullable String result) {
        if (this.finished.get()) {
            return;
        }
        TaskStatus nTaskStatus = TaskStatus.of(newStatus);
        int lockId = taskId.hashCode();
        try {
            int failedCnt;
            Optional<TaskDO> taskOpt;
            this.segmentLock.lockInterruptible(lockId);
            TaskBriefInfo taskBriefInfo = (TaskBriefInfo)this.taskId2BriefInfo.getIfPresent((Object)taskId);
            if (taskBriefInfo == null) {
                Optional<TaskDO> taskOpt2 = this.taskPersistenceService.getTask(this.instanceId, taskId);
                if (taskOpt2.isPresent()) {
                    TaskDO taskDO = taskOpt2.get();
                    taskBriefInfo = new TaskBriefInfo(taskId, TaskStatus.of(taskDO.getStatus()), taskDO.getLastReportTime());
                } else {
                    log.error("[TaskTracker-{}-{}] can't find task by taskId={}.", new Object[]{this.instanceId, subInstanceId, taskId});
                    taskBriefInfo = new TaskBriefInfo(taskId, TaskStatus.WAITING_DISPATCH, -1L);
                }
                this.taskId2BriefInfo.put((Object)taskId, (Object)taskBriefInfo);
            }
            if (taskBriefInfo.getLastReportTime() > reportTime) {
                log.warn("[TaskTracker-{}-{}] receive expired(last {} > current {}) task status report(taskId={},newStatus={}), TaskTracker will drop this report.", new Object[]{this.instanceId, subInstanceId, taskBriefInfo.getLastReportTime(), reportTime, taskId, newStatus});
                return;
            }
            if (nTaskStatus.getValue() < taskBriefInfo.getStatus().getValue()) {
                log.warn("[TaskTracker-{}-{}] receive invalid task status report(taskId={},currentStatus={},newStatus={}), TaskTracker will drop this report.", new Object[]{this.instanceId, subInstanceId, taskId, taskBriefInfo.getStatus().getValue(), newStatus});
                return;
            }
            taskBriefInfo.setLastReportTime(reportTime);
            taskBriefInfo.setStatus(nTaskStatus);
            int configTaskRetryNum = this.instanceInfo.getTaskRetryNum();
            if (nTaskStatus == TaskStatus.WORKER_PROCESS_FAILED && configTaskRetryNum >= 1 && (taskOpt = this.taskPersistenceService.getTask(this.instanceId, taskId)).isPresent() && (failedCnt = taskOpt.get().getFailedCnt().intValue()) < configTaskRetryNum) {
                TaskDO updateEntity = new TaskDO();
                updateEntity.setFailedCnt(failedCnt + 1);
                String taskName = taskOpt.get().getTaskName();
                ExecuteType executeType = ExecuteType.valueOf((String)this.instanceInfo.getExecuteType());
                if (!taskName.equals("OMS_ROOT_TASK") && !taskName.equals("OMS_LAST_TASK") && executeType != ExecuteType.BROADCAST) {
                    updateEntity.setAddress("N/A");
                }
                updateEntity.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
                updateEntity.setLastReportTime(reportTime);
                boolean retryTask = this.taskPersistenceService.updateTask(this.instanceId, taskId, updateEntity);
                if (retryTask) {
                    log.info("[TaskTracker-{}-{}] task(taskId={}) process failed, TaskTracker will have a retry.", new Object[]{this.instanceId, subInstanceId, taskId});
                    return;
                }
            }
            result = result == null ? "" : result;
            boolean updateResult = this.taskPersistenceService.updateTaskStatus(this.instanceId, taskId, newStatus, reportTime, result);
            if (!updateResult) {
                log.warn("[TaskTracker-{}-{}] update task status failed, this task(taskId={}) may be processed repeatedly!", new Object[]{this.instanceId, subInstanceId, taskId});
            }
        }
        catch (InterruptedException taskBriefInfo) {
        }
        catch (Exception e) {
            log.warn("[TaskTracker-{}-{}] update task status failed.", new Object[]{this.instanceId, subInstanceId, e});
        }
        finally {
            this.segmentLock.unlock(lockId);
        }
    }

    public boolean submitTask(List<TaskDO> newTaskList) {
        if (this.finished.get()) {
            return true;
        }
        if (CollectionUtils.isEmpty(newTaskList)) {
            return true;
        }
        newTaskList.forEach(task -> {
            task.setInstanceId(this.instanceId);
            task.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
            task.setFailedCnt(0);
            task.setLastModifiedTime(System.currentTimeMillis());
            task.setCreatedTime(System.currentTimeMillis());
            task.setLastReportTime(-1L);
        });
        log.debug("[TaskTracker-{}] receive new tasks: {}", (Object)this.instanceId, newTaskList);
        return this.taskPersistenceService.batchSave(newTaskList);
    }

    public void receiveProcessorTrackerHeartbeat(ProcessorTrackerStatusReportReq heartbeatReq) {
        log.debug("[TaskTracker-{}] receive heartbeat: {}", (Object)this.instanceId, (Object)heartbeatReq);
        this.ptStatusHolder.updateStatus(heartbeatReq);
        if (heartbeatReq.getType() == 1) {
            String idlePtAddress = heartbeatReq.getAddress();
            this.ptStatusHolder.getProcessorTrackerStatus(idlePtAddress).setDispatched(false);
            List<TaskDO> unfinishedTask = this.taskPersistenceService.getAllUnFinishedTaskByAddress(this.instanceId, idlePtAddress);
            if (!CollectionUtils.isEmpty(unfinishedTask)) {
                log.warn("[TaskTracker-{}] ProcessorTracker({}) is idle now but have unfinished tasks: {}", new Object[]{this.instanceId, idlePtAddress, unfinishedTask});
                unfinishedTask.forEach(task -> this.updateTaskStatus(task.getSubInstanceId(), task.getTaskId(), TaskStatus.WORKER_PROCESS_FAILED.getValue(), System.currentTimeMillis(), "SYSTEM: unreceived process result"));
            }
        }
    }

    public void broadcast(boolean preExecuteSuccess, long subInstanceId, String preTaskId, String result) {
        if (this.finished.get()) {
            return;
        }
        log.info("[TaskTracker-{}-{}] finished broadcast's preProcess, preExecuteSuccess:{},preTaskId:{},result:{}", new Object[]{this.instanceId, subInstanceId, preExecuteSuccess, preTaskId, result});
        if (preExecuteSuccess) {
            List<String> allWorkerAddress = this.ptStatusHolder.getAllProcessorTrackers();
            LinkedList subTaskList = Lists.newLinkedList();
            for (int i = 0; i < allWorkerAddress.size(); ++i) {
                TaskDO subTask = new TaskDO();
                subTask.setSubInstanceId(subInstanceId);
                subTask.setTaskName("OMS_BROADCAST_TASK");
                subTask.setTaskId(preTaskId + "." + i);
                subTask.setAddress(allWorkerAddress.get(i));
                subTaskList.add(subTask);
            }
            this.submitTask(subTaskList);
        } else {
            log.warn("[TaskTracker-{}-{}] BroadcastTask failed because of preProcess failed, preProcess result={}.", new Object[]{this.instanceId, subInstanceId, result});
        }
    }

    public void destroy() {
        this.finished.set(true);
        Stopwatch sw = Stopwatch.createStarted();
        this.scheduledPool.shutdown();
        TaskTrackerStopInstanceReq stopRequest = new TaskTrackerStopInstanceReq();
        stopRequest.setInstanceId(this.instanceId);
        this.ptStatusHolder.getAllProcessorTrackers().forEach(ptIP -> {
            String ptPath = AkkaUtils.getAkkaWorkerPath(ptIP, "processor_tracker");
            ActorSelection ptActor = this.workerRuntime.getActorSystem().actorSelection(ptPath);
            ptActor.tell((Object)stopRequest, null);
        });
        boolean dbSuccess = this.taskPersistenceService.deleteAllTasks(this.instanceId);
        if (!dbSuccess) {
            log.error("[TaskTracker-{}] delete tasks from database failed.", (Object)this.instanceId);
        } else {
            log.debug("[TaskTracker-{}] delete all tasks from database successfully.", (Object)this.instanceId);
        }
        TaskTrackerPool.remove(this.instanceId);
        log.info("[TaskTracker-{}] TaskTracker has left the world(using {}), bye~", (Object)this.instanceId, (Object)sw.stop());
        if (!this.scheduledPool.isTerminated()) {
            CommonUtils.executeIgnoreException(() -> this.scheduledPool.shutdownNow());
        }
    }

    protected void dispatchTask(TaskDO task, String processorTrackerAddress) {
        TaskDO updateEntity = new TaskDO();
        updateEntity.setStatus(TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK.getValue());
        updateEntity.setAddress(processorTrackerAddress);
        boolean success = this.taskPersistenceService.updateTask(this.instanceId, task.getTaskId(), updateEntity);
        if (!success) {
            log.warn("[TaskTracker-{}] dispatch task(taskId={},taskName={}) failed due to update task status failed.", new Object[]{this.instanceId, task.getTaskId(), task.getTaskName()});
            return;
        }
        this.ptStatusHolder.getProcessorTrackerStatus(processorTrackerAddress).setDispatched(true);
        this.taskId2BriefInfo.put((Object)task.getTaskId(), (Object)new TaskBriefInfo(task.getTaskId(), TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, -1L));
        TaskTrackerStartTaskReq startTaskReq = new TaskTrackerStartTaskReq(this.instanceInfo, task, this.workerRuntime.getWorkerAddress());
        String ptActorPath = AkkaUtils.getAkkaWorkerPath(processorTrackerAddress, "processor_tracker");
        ActorSelection ptActor = this.workerRuntime.getActorSystem().actorSelection(ptActorPath);
        ptActor.tell((Object)startTaskReq, null);
        log.debug("[TaskTracker-{}] dispatch task(taskId={},taskName={}) successfully.", new Object[]{this.instanceId, task.getTaskId(), task.getTaskName()});
    }

    protected InstanceStatisticsHolder getInstanceStatisticsHolder(long subInstanceId) {
        Map<TaskStatus, Long> status2Num = this.taskPersistenceService.getTaskStatusStatistics(this.instanceId, subInstanceId);
        InstanceStatisticsHolder holder = new InstanceStatisticsHolder();
        holder.waitingDispatchNum = status2Num.getOrDefault((Object)TaskStatus.WAITING_DISPATCH, 0L);
        holder.workerUnreceivedNum = status2Num.getOrDefault((Object)TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, 0L);
        holder.receivedNum = status2Num.getOrDefault((Object)TaskStatus.WORKER_RECEIVED, 0L);
        holder.runningNum = status2Num.getOrDefault((Object)TaskStatus.WORKER_PROCESSING, 0L);
        holder.failedNum = status2Num.getOrDefault((Object)TaskStatus.WORKER_PROCESS_FAILED, 0L);
        holder.succeedNum = status2Num.getOrDefault((Object)TaskStatus.WORKER_PROCESS_SUCCESS, 0L);
        return holder;
    }

    protected abstract void initTaskTracker(ServerScheduleJobReq var1);

    public abstract InstanceDetail fetchRunningStatus();

    protected static class InstanceStatisticsHolder {
        protected long waitingDispatchNum;
        protected long workerUnreceivedNum;
        protected long receivedNum;
        protected long runningNum;
        protected long failedNum;
        protected long succeedNum;

        public long getTotalTaskNum() {
            return this.waitingDispatchNum + this.workerUnreceivedNum + this.receivedNum + this.runningNum + this.failedNum + this.succeedNum;
        }

        public long getWaitingDispatchNum() {
            return this.waitingDispatchNum;
        }

        public long getWorkerUnreceivedNum() {
            return this.workerUnreceivedNum;
        }

        public long getReceivedNum() {
            return this.receivedNum;
        }

        public long getRunningNum() {
            return this.runningNum;
        }

        public long getFailedNum() {
            return this.failedNum;
        }

        public long getSucceedNum() {
            return this.succeedNum;
        }

        public void setWaitingDispatchNum(long waitingDispatchNum) {
            this.waitingDispatchNum = waitingDispatchNum;
        }

        public void setWorkerUnreceivedNum(long workerUnreceivedNum) {
            this.workerUnreceivedNum = workerUnreceivedNum;
        }

        public void setReceivedNum(long receivedNum) {
            this.receivedNum = receivedNum;
        }

        public void setRunningNum(long runningNum) {
            this.runningNum = runningNum;
        }

        public void setFailedNum(long failedNum) {
            this.failedNum = failedNum;
        }

        public void setSucceedNum(long succeedNum) {
            this.succeedNum = succeedNum;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof InstanceStatisticsHolder)) {
                return false;
            }
            InstanceStatisticsHolder other = (InstanceStatisticsHolder)o;
            if (!other.canEqual(this)) {
                return false;
            }
            if (this.getWaitingDispatchNum() != other.getWaitingDispatchNum()) {
                return false;
            }
            if (this.getWorkerUnreceivedNum() != other.getWorkerUnreceivedNum()) {
                return false;
            }
            if (this.getReceivedNum() != other.getReceivedNum()) {
                return false;
            }
            if (this.getRunningNum() != other.getRunningNum()) {
                return false;
            }
            if (this.getFailedNum() != other.getFailedNum()) {
                return false;
            }
            return this.getSucceedNum() == other.getSucceedNum();
        }

        protected boolean canEqual(Object other) {
            return other instanceof InstanceStatisticsHolder;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            long $waitingDispatchNum = this.getWaitingDispatchNum();
            result = result * 59 + (int)($waitingDispatchNum >>> 32 ^ $waitingDispatchNum);
            long $workerUnreceivedNum = this.getWorkerUnreceivedNum();
            result = result * 59 + (int)($workerUnreceivedNum >>> 32 ^ $workerUnreceivedNum);
            long $receivedNum = this.getReceivedNum();
            result = result * 59 + (int)($receivedNum >>> 32 ^ $receivedNum);
            long $runningNum = this.getRunningNum();
            result = result * 59 + (int)($runningNum >>> 32 ^ $runningNum);
            long $failedNum = this.getFailedNum();
            result = result * 59 + (int)($failedNum >>> 32 ^ $failedNum);
            long $succeedNum = this.getSucceedNum();
            result = result * 59 + (int)($succeedNum >>> 32 ^ $succeedNum);
            return result;
        }

        public String toString() {
            return "TaskTracker.InstanceStatisticsHolder(waitingDispatchNum=" + this.getWaitingDispatchNum() + ", workerUnreceivedNum=" + this.getWorkerUnreceivedNum() + ", receivedNum=" + this.getReceivedNum() + ", runningNum=" + this.getRunningNum() + ", failedNum=" + this.getFailedNum() + ", succeedNum=" + this.getSucceedNum() + ")";
        }
    }

    protected static class TaskBriefInfo {
        private String id;
        private TaskStatus status;
        private Long lastReportTime;

        public String getId() {
            return this.id;
        }

        public TaskStatus getStatus() {
            return this.status;
        }

        public Long getLastReportTime() {
            return this.lastReportTime;
        }

        public void setId(String id) {
            this.id = id;
        }

        public void setStatus(TaskStatus status) {
            this.status = status;
        }

        public void setLastReportTime(Long lastReportTime) {
            this.lastReportTime = lastReportTime;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof TaskBriefInfo)) {
                return false;
            }
            TaskBriefInfo other = (TaskBriefInfo)o;
            if (!other.canEqual(this)) {
                return false;
            }
            String this$id = this.getId();
            String other$id = other.getId();
            if (this$id == null ? other$id != null : !this$id.equals(other$id)) {
                return false;
            }
            TaskStatus this$status = this.getStatus();
            TaskStatus other$status = other.getStatus();
            if (this$status == null ? other$status != null : !((Object)((Object)this$status)).equals((Object)other$status)) {
                return false;
            }
            Long this$lastReportTime = this.getLastReportTime();
            Long other$lastReportTime = other.getLastReportTime();
            return !(this$lastReportTime == null ? other$lastReportTime != null : !((Object)this$lastReportTime).equals(other$lastReportTime));
        }

        protected boolean canEqual(Object other) {
            return other instanceof TaskBriefInfo;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            String $id = this.getId();
            result = result * 59 + ($id == null ? 43 : $id.hashCode());
            TaskStatus $status = this.getStatus();
            result = result * 59 + ($status == null ? 43 : ((Object)((Object)$status)).hashCode());
            Long $lastReportTime = this.getLastReportTime();
            result = result * 59 + ($lastReportTime == null ? 43 : ((Object)$lastReportTime).hashCode());
            return result;
        }

        public String toString() {
            return "TaskTracker.TaskBriefInfo(id=" + this.getId() + ", status=" + (Object)((Object)this.getStatus()) + ", lastReportTime=" + this.getLastReportTime() + ")";
        }

        public TaskBriefInfo(String id, TaskStatus status, Long lastReportTime) {
            this.id = id;
            this.status = status;
            this.lastReportTime = lastReportTime;
        }
    }

    protected class WorkerDetector
    implements Runnable {
        protected WorkerDetector() {
        }

        @Override
        public void run() {
            boolean needMoreWorker = TaskTracker.this.ptStatusHolder.checkNeedMoreWorker();
            log.info("[TaskTracker-{}] checkNeedMoreWorker: {}", (Object)TaskTracker.this.instanceId, (Object)needMoreWorker);
            if (!needMoreWorker) {
                return;
            }
            String serverPath = AkkaUtils.getServerActorPath(TaskTracker.this.workerRuntime.getServerDiscoveryService().getCurrentServerAddress());
            if (StringUtils.isEmpty((Object)serverPath)) {
                log.warn("[TaskTracker-{}] no server available, won't start worker detective!", (Object)TaskTracker.this.instanceId);
                return;
            }
            WorkerQueryExecutorClusterReq req = new WorkerQueryExecutorClusterReq(TaskTracker.this.workerRuntime.getAppId(), TaskTracker.this.instanceInfo.getJobId());
            AskResponse response = AkkaUtils.easyAsk(TaskTracker.this.workerRuntime.getActorSystem().actorSelection(serverPath), req);
            if (!response.isSuccess()) {
                log.warn("[TaskTracker-{}] detective failed due to ask failed, message is {}", (Object)TaskTracker.this.instanceId, (Object)response.getMessage());
                return;
            }
            try {
                List workerList = (List)JsonUtils.parseObject((byte[])response.getData(), (TypeReference)new TypeReference<List<String>>(){});
                TaskTracker.this.ptStatusHolder.register(workerList);
            }
            catch (Exception e) {
                log.warn("[TaskTracker-{}] detective failed!", (Object)TaskTracker.this.instanceId, (Object)e);
            }
        }
    }

    protected class Dispatcher
    implements Runnable {
        private static final int DB_QUERY_LIMIT = 100;

        protected Dispatcher() {
        }

        @Override
        public void run() {
            long currentDispatchNum;
            List<TaskDO> needDispatchTasks;
            if (TaskTracker.this.finished.get()) {
                return;
            }
            Stopwatch stopwatch = Stopwatch.createStarted();
            List<String> availablePtIps = TaskTracker.this.ptStatusHolder.getAvailableProcessorTrackers();
            if (availablePtIps.isEmpty()) {
                log.debug("[TaskTracker-{}] no available ProcessorTracker now.", (Object)TaskTracker.this.instanceId);
                return;
            }
            long maxDispatchNum = (long)(availablePtIps.size() * TaskTracker.this.instanceInfo.getThreadConcurrency()) * 2L;
            AtomicInteger index = new AtomicInteger(0);
            for (currentDispatchNum = 0L; maxDispatchNum > currentDispatchNum; currentDispatchNum += (long)needDispatchTasks.size()) {
                int dbQueryLimit = Math.min(100, (int)maxDispatchNum);
                needDispatchTasks = TaskTracker.this.taskPersistenceService.getTaskByStatus(TaskTracker.this.instanceId, TaskStatus.WAITING_DISPATCH, dbQueryLimit);
                needDispatchTasks.forEach(task -> {
                    String ptAddress = task.getAddress();
                    if (StringUtils.isEmpty((Object)ptAddress) || "N/A".equals(ptAddress)) {
                        ptAddress = (String)availablePtIps.get(index.getAndIncrement() % availablePtIps.size());
                    }
                    TaskTracker.this.dispatchTask((TaskDO)task, ptAddress);
                });
                if (needDispatchTasks.size() >= dbQueryLimit) continue;
                break;
            }
            log.debug("[TaskTracker-{}] dispatched {} tasks,using time {}.", new Object[]{TaskTracker.this.instanceId, currentDispatchNum, stopwatch.stop()});
        }
    }
}

