package org.apache.seatunnel.engine.server.dag.physical;

import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import com.hazelcast.map.IMap;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.JobResult;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.core.job.PipelineStatus;
import org.apache.seatunnel.engine.server.master.JobMaster;

/* loaded from: input_file:org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.class */
public class PhysicalPlan {
    private static final ILogger LOGGER = Logger.getLogger(PhysicalPlan.class);
    private final List<SubPlan> pipelineList;
    private final JobImmutableInformation jobImmutableInformation;
    private final IMap<Object, Object> runningJobStateIMap;
    private final IMap<Object, Long[]> runningJobStateTimestampsIMap;
    private CompletableFuture<JobResult> jobEndFuture;
    private final String jobFullName;
    private final long jobId;
    private JobMaster jobMaster;
    private AtomicInteger finishedPipelineNum = new AtomicInteger(0);
    private AtomicInteger canceledPipelineNum = new AtomicInteger(0);
    private AtomicInteger failedPipelineNum = new AtomicInteger(0);
    private final AtomicReference<String> errorBySubPlan = new AtomicReference<>();
    private boolean makeJobEndWhenPipelineEnded = true;

    public PhysicalPlan(@NonNull List<SubPlan> list, @NonNull ExecutorService executorService, @NonNull JobImmutableInformation jobImmutableInformation, long j, @NonNull IMap iMap, @NonNull IMap iMap2) {
        if (list == null) {
            throw new NullPointerException("pipelineList is marked non-null but is null");
        }
        if (executorService == null) {
            throw new NullPointerException("executorService is marked non-null but is null");
        }
        if (jobImmutableInformation == null) {
            throw new NullPointerException("jobImmutableInformation is marked non-null but is null");
        }
        if (iMap == null) {
            throw new NullPointerException("runningJobStateIMap is marked non-null but is null");
        }
        if (iMap2 == null) {
            throw new NullPointerException("runningJobStateTimestampsIMap is marked non-null but is null");
        }
        this.jobImmutableInformation = jobImmutableInformation;
        this.jobId = jobImmutableInformation.getJobId();
        Long[] lArr = new Long[JobStatus.values().length];
        if (iMap2.get(Long.valueOf(this.jobId)) == null) {
            lArr[JobStatus.INITIALIZING.ordinal()] = Long.valueOf(j);
            iMap2.put(Long.valueOf(this.jobId), lArr);
        }
        if (iMap.get(Long.valueOf(this.jobId)) == null) {
            lArr[JobStatus.CREATED.ordinal()] = Long.valueOf(System.currentTimeMillis());
            iMap2.put(Long.valueOf(this.jobId), lArr);
            iMap.put(Long.valueOf(this.jobId), JobStatus.CREATED);
        }
        this.pipelineList = list;
        if (list.isEmpty()) {
            throw new UnknownPhysicalPlanException("The physical plan didn't have any can execute pipeline");
        }
        this.jobFullName = String.format("Job %s (%s)", jobImmutableInformation.getJobConfig().getName(), Long.valueOf(jobImmutableInformation.getJobId()));
        this.runningJobStateIMap = iMap;
        this.runningJobStateTimestampsIMap = iMap2;
    }

    public void setJobMaster(JobMaster jobMaster) {
        this.jobMaster = jobMaster;
        this.pipelineList.forEach(subPlan -> {
            subPlan.setJobMaster(jobMaster);
        });
    }

    public PassiveCompletableFuture<JobResult> initStateFuture() {
        this.jobEndFuture = new CompletableFuture<>();
        this.pipelineList.forEach(this::addPipelineEndCallback);
        return new PassiveCompletableFuture<>(this.jobEndFuture);
    }

    public void addPipelineEndCallback(SubPlan subPlan) {
        subPlan.initStateFuture().thenAcceptAsync(pipelineExecutionState -> {
            JobStatus jobStatus;
            try {
                if (PipelineStatus.CANCELED.equals(pipelineExecutionState.getPipelineStatus())) {
                    this.canceledPipelineNum.incrementAndGet();
                    if (this.makeJobEndWhenPipelineEnded) {
                        LOGGER.info(String.format("cancel job %s because makeJobEndWhenPipelineEnded is true", this.jobFullName));
                        cancelJob();
                    }
                } else if (PipelineStatus.FAILED.equals(pipelineExecutionState.getPipelineStatus())) {
                    this.failedPipelineNum.incrementAndGet();
                    this.errorBySubPlan.compareAndSet(null, pipelineExecutionState.getThrowableMsg());
                    if (this.makeJobEndWhenPipelineEnded) {
                        LOGGER.info(String.format("cancel job %s because makeJobEndWhenPipelineEnded is true", this.jobFullName));
                        cancelJob();
                    }
                }
                if (this.finishedPipelineNum.incrementAndGet() == this.pipelineList.size()) {
                    if (this.failedPipelineNum.get() > 0) {
                        jobStatus = JobStatus.FAILING;
                        updateJobState(jobStatus);
                    } else if (this.canceledPipelineNum.get() > 0) {
                        jobStatus = JobStatus.CANCELED;
                        turnToEndState(jobStatus);
                    } else {
                        jobStatus = JobStatus.FINISHED;
                        turnToEndState(jobStatus);
                    }
                    this.jobEndFuture.complete(new JobResult(jobStatus, this.errorBySubPlan.get()));
                }
            } catch (Throwable th) {
                LOGGER.severe(ExceptionUtils.getMessage(th));
            }
        }, (Executor) this.jobMaster.getExecutorService());
    }

    public void cancelJob() {
        this.jobMaster.neverNeedRestore();
        if (getJobStatus().isEndState()) {
            LOGGER.warning(String.format("%s is in end state %s, can not be cancel", this.jobFullName, getJobStatus()));
        } else if (JobStatus.CANCELLING.equals(getJobStatus())) {
            cancelJobPipelines();
        } else {
            updateJobState((JobStatus) this.runningJobStateIMap.get(Long.valueOf(this.jobId)), JobStatus.CANCELLING);
            cancelJobPipelines();
        }
    }

    private void cancelJobPipelines() {
        try {
            CompletableFuture.allOf((CompletableFuture[]) ((List) this.pipelineList.stream().map(subPlan -> {
                subPlan.getClass();
                return CompletableFuture.runAsync(subPlan::cancelPipeline, this.jobMaster.getExecutorService());
            }).collect(Collectors.toList())).toArray(new CompletableFuture[0])).join();
        } catch (Exception e) {
            LOGGER.severe(String.format("%s cancel error with exception: %s", this.jobFullName, ExceptionUtils.getMessage(e)));
        }
    }

    public List<SubPlan> getPipelineList() {
        return this.pipelineList;
    }

    private void turnToEndState(@NonNull JobStatus jobStatus) {
        if (jobStatus == null) {
            throw new NullPointerException("endState is marked non-null but is null");
        }
        synchronized (this) {
            JobStatus jobStatus2 = (JobStatus) this.runningJobStateIMap.get(Long.valueOf(this.jobId));
            if (jobStatus2.isEndState()) {
                String str = "Job is trying to leave terminal state " + jobStatus2;
                LOGGER.severe(str);
                throw new IllegalStateException(str);
            }
            if (!jobStatus.isEndState()) {
                String str2 = "Need a end state, not " + jobStatus;
                LOGGER.severe(str2);
                throw new IllegalStateException(str2);
            }
            this.jobMaster.getCheckpointManager().shutdown(jobStatus);
            LOGGER.info(String.format("%s end with state %s", getJobFullName(), jobStatus));
            updateStateTimestamps(jobStatus);
            this.runningJobStateIMap.put(Long.valueOf(this.jobId), jobStatus);
        }
    }

    private void updateStateTimestamps(@NonNull JobStatus jobStatus) {
        if (jobStatus == null) {
            throw new NullPointerException("targetState is marked non-null but is null");
        }
        Long[] lArr = this.runningJobStateTimestampsIMap.get(Long.valueOf(this.jobId));
        lArr[jobStatus.ordinal()] = Long.valueOf(System.currentTimeMillis());
        this.runningJobStateTimestampsIMap.set(Long.valueOf(this.jobId), lArr);
    }

    public boolean updateJobState(@NonNull JobStatus jobStatus) {
        boolean updateJobState;
        if (jobStatus == null) {
            throw new NullPointerException("targetState is marked non-null but is null");
        }
        synchronized (this) {
            updateJobState = updateJobState((JobStatus) this.runningJobStateIMap.get(Long.valueOf(this.jobId)), jobStatus);
        }
        return updateJobState;
    }

    public boolean updateJobState(@NonNull JobStatus jobStatus, @NonNull JobStatus jobStatus2) {
        if (jobStatus == null) {
            throw new NullPointerException("current is marked non-null but is null");
        }
        if (jobStatus2 == null) {
            throw new NullPointerException("targetState is marked non-null but is null");
        }
        synchronized (this) {
            if (jobStatus.isEndState()) {
                String str = "Job is trying to leave terminal state " + jobStatus;
                LOGGER.severe(str);
                throw new IllegalStateException(str);
            }
            if (!jobStatus.equals(this.runningJobStateIMap.get(Long.valueOf(this.jobId)))) {
                return false;
            }
            LOGGER.info(String.format("Job %s (%s) turn from state %s to %s.", this.jobImmutableInformation.getJobConfig().getName(), Long.valueOf(this.jobId), jobStatus, jobStatus2));
            updateStateTimestamps(jobStatus2);
            this.runningJobStateIMap.set(Long.valueOf(this.jobId), jobStatus2);
            return true;
        }
    }

    public JobImmutableInformation getJobImmutableInformation() {
        return this.jobImmutableInformation;
    }

    public JobStatus getJobStatus() {
        return (JobStatus) this.runningJobStateIMap.get(Long.valueOf(this.jobId));
    }

    public String getJobFullName() {
        return this.jobFullName;
    }
}
