package org.apache.seatunnel.engine.server.scheduler;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.engine.common.exception.JobException;
import org.apache.seatunnel.engine.common.exception.SchedulerNotAllowException;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.core.job.PipelineStatus;
import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
import org.apache.seatunnel.engine.server.execution.ExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskDeployState;
import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.master.JobMaster;
import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.class */
public class PipelineBaseScheduler implements JobScheduler {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) PipelineBaseScheduler.class);
    private final PhysicalPlan physicalPlan;
    private final long jobId;
    private final JobMaster jobMaster;
    private final ResourceManager resourceManager;

    public PipelineBaseScheduler(@NonNull PhysicalPlan physicalPlan, @NonNull JobMaster jobMaster) {
        if (physicalPlan == null) {
            throw new NullPointerException("physicalPlan is marked non-null but is null");
        }
        if (jobMaster == null) {
            throw new NullPointerException("jobMaster is marked non-null but is null");
        }
        this.physicalPlan = physicalPlan;
        this.jobMaster = jobMaster;
        this.resourceManager = jobMaster.getResourceManager();
        this.jobId = physicalPlan.getJobImmutableInformation().getJobId();
    }

    @Override // org.apache.seatunnel.engine.server.scheduler.JobScheduler
    public void startScheduling() {
        if (!this.physicalPlan.updateJobState(JobStatus.CREATED, JobStatus.SCHEDULED)) {
            if (!JobStatus.CANCELED.equals(this.physicalPlan.getJobStatus())) {
                throw new JobException(String.format("%s turn to a unexpected state: %s", this.physicalPlan.getJobFullName(), this.physicalPlan.getJobStatus()));
            }
        } else {
            try {
                CompletableFuture.allOf((CompletableFuture[]) ((List) this.physicalPlan.getPipelineList().stream().map(this::schedulerPipeline).filter((v0) -> {
                    return Objects.nonNull(v0);
                }).collect(Collectors.toList())).toArray(new CompletableFuture[0])).get();
                this.physicalPlan.updateJobState(JobStatus.SCHEDULED, JobStatus.RUNNING);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    private CompletableFuture<Void> schedulerPipeline(SubPlan subPlan) {
        try {
            if (!subPlan.updatePipelineState(PipelineStatus.CREATED, PipelineStatus.SCHEDULED)) {
                handlePipelineStateTurnError(subPlan, PipelineStatus.SCHEDULED);
            }
            Map<TaskGroupLocation, SlotProfile> orApplyResourceForPipeline = getOrApplyResourceForPipeline(subPlan, this.jobMaster.getOwnedSlotProfiles(subPlan.getPipelineLocation()));
            log.debug("slotProfiles: {}, PipelineLocation: {}", orApplyResourceForPipeline, subPlan.getPipelineLocation());
            this.jobMaster.setOwnedSlotProfiles(subPlan.getPipelineLocation(), orApplyResourceForPipeline);
            return CompletableFuture.runAsync(() -> {
                deployPipeline(subPlan, orApplyResourceForPipeline);
            }, this.jobMaster.getExecutorService());
        } catch (SchedulerNotAllowException e) {
            log.error(String.format("scheduler %s stop. Because %s", subPlan.getPipelineFullName(), ExceptionUtils.getMessage(e)));
            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
            completableFuture.complete(null);
            return completableFuture;
        } catch (Exception e2) {
            log.error(String.format("scheduler %s error and cancel pipeline. The error is %s", subPlan.getPipelineFullName(), ExceptionUtils.getMessage(e2)));
            subPlan.cancelPipeline();
            CompletableFuture<Void> completableFuture2 = new CompletableFuture<>();
            completableFuture2.complete(null);
            return completableFuture2;
        }
    }

    private Map<TaskGroupLocation, SlotProfile> getOrApplyResourceForPipeline(@NonNull SubPlan subPlan, Map<TaskGroupLocation, SlotProfile> map) {
        if (subPlan == null) {
            throw new NullPointerException("pipeline is marked non-null but is null");
        }
        if (map == null || map.isEmpty()) {
            return applyResourceForPipeline(subPlan);
        }
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        subPlan.getCoordinatorVertexList().forEach(physicalVertex -> {
        });
        subPlan.getPhysicalVertexList().forEach(physicalVertex2 -> {
        });
        return concurrentHashMap;
    }

    private SlotProfile getOrApplyResourceForTask(@NonNull PhysicalVertex physicalVertex, Map<TaskGroupLocation, SlotProfile> map) {
        if (physicalVertex == null) {
            throw new NullPointerException("task is marked non-null but is null");
        }
        SlotProfile slotProfile = (map == null || map.isEmpty() || map.get(physicalVertex.getTaskGroupLocation()) == null) ? null : map.get(physicalVertex.getTaskGroupLocation());
        if (slotProfile != null && this.resourceManager.slotActiveCheck(slotProfile)) {
            log.info(String.format("use active old profile: %s for task %s", slotProfile, physicalVertex.getTaskFullName()));
            physicalVertex.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED);
            return slotProfile;
        }
        CompletableFuture<SlotProfile> applyResourceForTask = applyResourceForTask(physicalVertex);
        if (applyResourceForTask == null) {
            throw new SchedulerNotAllowException(String.format("The task [%s] state is [%s] and the resource can not be retrieved", physicalVertex.getTaskFullName(), physicalVertex.getExecutionState()));
        }
        SlotProfile join = applyResourceForTask.join();
        log.info(String.format("use new profile: %s to replace not active profile: %s for task %s", join, slotProfile, physicalVertex.getTaskFullName()));
        return join;
    }

    private Map<TaskGroupLocation, SlotProfile> applyResourceForPipeline(@NonNull SubPlan subPlan) {
        if (subPlan == null) {
            throw new NullPointerException("subPlan is marked non-null but is null");
        }
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        subPlan.getCoordinatorVertexList().forEach(physicalVertex -> {
        });
        subPlan.getPhysicalVertexList().forEach(physicalVertex2 -> {
        });
        for (Map.Entry entry : hashMap.entrySet()) {
            hashMap2.put(entry.getKey(), entry.getValue() == null ? null : (SlotProfile) ((CompletableFuture) entry.getValue()).join());
        }
        return hashMap2;
    }

    private CompletableFuture<SlotProfile> applyResourceForTask(PhysicalVertex physicalVertex) {
        try {
            if (physicalVertex.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED)) {
                return this.resourceManager.applyResource(this.jobId, new ResourceProfile());
            }
            if (ExecutionState.CANCELING.equals(physicalVertex.getExecutionState()) || ExecutionState.CANCELED.equals(physicalVertex.getExecutionState())) {
                log.info("{} be canceled, skip {} this task.", physicalVertex.getTaskFullName(), ExecutionState.SCHEDULED);
                return null;
            }
            makeTaskFailed(physicalVertex.getTaskGroupLocation(), new JobException(String.format("%s turn to a unexpected state: %s, stop scheduler job.", physicalVertex.getTaskFullName(), physicalVertex.getExecutionState())));
            return null;
        } catch (Throwable th) {
            makeTaskFailed(physicalVertex.getTaskGroupLocation(), th);
            return null;
        }
    }

    private CompletableFuture<Void> deployTask(PhysicalVertex physicalVertex, SlotProfile slotProfile) {
        if (physicalVertex.updateTaskState(ExecutionState.SCHEDULED, ExecutionState.DEPLOYING)) {
            return CompletableFuture.runAsync(() -> {
                try {
                    TaskDeployState deploy = physicalVertex.deploy(slotProfile);
                    if (!deploy.isSuccess()) {
                        this.jobMaster.updateTaskExecutionState(new TaskExecutionState(physicalVertex.getTaskGroupLocation(), ExecutionState.FAILED, deploy.getThrowableMsg()));
                    }
                } catch (Exception e) {
                    throw new SeaTunnelEngineException(e);
                }
            }, this.jobMaster.getExecutorService());
        }
        if (ExecutionState.CANCELING.equals(physicalVertex.getExecutionState()) || ExecutionState.CANCELED.equals(physicalVertex.getExecutionState())) {
            log.info("{} be canceled, skip {} this task.", physicalVertex.getTaskFullName(), ExecutionState.DEPLOYING);
            return null;
        }
        this.jobMaster.updateTaskExecutionState(new TaskExecutionState(physicalVertex.getTaskGroupLocation(), ExecutionState.FAILED, new JobException(String.format("%s turn to a unexpected state: %s, stop scheduler job.", physicalVertex.getTaskFullName(), physicalVertex.getExecutionState()))));
        return null;
    }

    private void deployPipeline(@NonNull SubPlan subPlan, Map<TaskGroupLocation, SlotProfile> map) {
        if (subPlan == null) {
            throw new NullPointerException("pipeline is marked non-null but is null");
        }
        boolean z = false;
        try {
            z = subPlan.updatePipelineState(PipelineStatus.SCHEDULED, PipelineStatus.DEPLOYING);
        } catch (Exception e) {
            log.warn("{} turn to state {} failed, cancel pipeline", subPlan.getPipelineFullName(), PipelineStatus.DEPLOYING);
            subPlan.cancelPipeline();
        }
        if (!z) {
            if (PipelineStatus.CANCELING.equals(subPlan.getPipelineState()) || PipelineStatus.CANCELED.equals(subPlan.getPipelineState())) {
                log.info("{} turn to state {}, skip {} this pipeline.", subPlan.getPipelineFullName(), subPlan.getPipelineState(), PipelineStatus.DEPLOYING);
                return;
            } else {
                makePipelineFailed(subPlan, new JobException(String.format("%s turn to a unexpected state: %s, stop scheduler job", subPlan.getPipelineFullName(), subPlan.getPipelineState())));
                return;
            }
        }
        try {
            List list = (List) subPlan.getCoordinatorVertexList().stream().map(physicalVertex -> {
                return deployTask(physicalVertex, (SlotProfile) map.get(physicalVertex.getTaskGroupLocation()));
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).collect(Collectors.toList());
            list.addAll((List) subPlan.getPhysicalVertexList().stream().map(physicalVertex2 -> {
                return deployTask(physicalVertex2, (SlotProfile) map.get(physicalVertex2.getTaskGroupLocation()));
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).collect(Collectors.toList()));
            CompletableFuture.allOf((CompletableFuture[]) list.toArray(new CompletableFuture[0])).get();
            if (!subPlan.updatePipelineState(PipelineStatus.DEPLOYING, PipelineStatus.RUNNING)) {
                log.info("{} turn to state {}, skip the running state.", subPlan.getPipelineFullName(), subPlan.getPipelineState());
            }
        } catch (Exception e2) {
            makePipelineFailed(subPlan, e2);
        }
    }

    @Override // org.apache.seatunnel.engine.server.scheduler.JobScheduler
    public CompletableFuture<Void> reSchedulerPipeline(@NonNull SubPlan subPlan) {
        if (subPlan == null) {
            throw new NullPointerException("subPlan is marked non-null but is null");
        }
        return schedulerPipeline(subPlan);
    }

    private void handlePipelineStateTurnError(SubPlan subPlan, PipelineStatus pipelineStatus) {
        if (!PipelineStatus.CANCELING.equals(subPlan.getPipelineState()) && !PipelineStatus.CANCELED.equals(subPlan.getPipelineState())) {
            throw new JobException(String.format("%s turn to a unexpected state: %s, stop scheduler job", subPlan.getPipelineFullName(), subPlan.getPipelineState()));
        }
        throw new SchedulerNotAllowException(String.format("%s turn to state %s, skip %s this pipeline.", subPlan.getPipelineFullName(), subPlan.getPipelineState(), pipelineStatus));
    }

    private void makePipelineFailed(@NonNull SubPlan subPlan, Throwable th) {
        if (subPlan == null) {
            throw new NullPointerException("pipeline is marked non-null but is null");
        }
        subPlan.getCoordinatorVertexList().forEach(physicalVertex -> {
            makeTaskFailed(physicalVertex.getTaskGroupLocation(), th);
        });
        subPlan.getPhysicalVertexList().forEach(physicalVertex2 -> {
            makeTaskFailed(physicalVertex2.getTaskGroupLocation(), th);
        });
    }

    private void makeTaskFailed(@NonNull TaskGroupLocation taskGroupLocation, Throwable th) {
        if (taskGroupLocation == null) {
            throw new NullPointerException("taskGroupLocation is marked non-null but is null");
        }
        this.jobMaster.updateTaskExecutionState(new TaskExecutionState(taskGroupLocation, ExecutionState.FAILED, th));
    }
}
