package org.apache.seatunnel.engine.server;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.hazelcast.cluster.Address;
import com.hazelcast.internal.metrics.MetricDescriptorConstants;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.internal.services.MembershipServiceEvent;
import com.hazelcast.logging.ILogger;
import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngineImpl;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.apache.seatunnel.api.common.metrics.JobMetrics;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.common.utils.StringFormatUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.EngineConfig;
import org.apache.seatunnel.engine.common.exception.JobException;
import org.apache.seatunnel.engine.common.exception.JobNotFoundException;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.JobDAGInfo;
import org.apache.seatunnel.engine.core.job.JobInfo;
import org.apache.seatunnel.engine.core.job.JobResult;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
import org.apache.seatunnel.engine.server.execution.ExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.master.JobHistoryService;
import org.apache.seatunnel.engine.server.master.JobMaster;
import org.apache.seatunnel.engine.server.metrics.JobMetricsUtil;
import org.apache.seatunnel.engine.server.metrics.SeaTunnelMetricsContext;
import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
import org.apache.seatunnel.engine.server.resourcemanager.ResourceManagerFactory;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;

/* loaded from: input_file:org/apache/seatunnel/engine/server/CoordinatorService.class */
public class CoordinatorService {
    private final NodeEngineImpl nodeEngine;
    private final ILogger logger;
    private volatile ResourceManager resourceManager;
    private JobHistoryService jobHistoryService;
    private IMap<Long, JobInfo> runningJobInfoIMap;
    IMap<Object, Object> runningJobStateIMap;
    IMap<Object, Long[]> runningJobStateTimestampsIMap;
    private IMap<PipelineLocation, Map<TaskGroupLocation, SlotProfile>> ownedSlotProfilesIMap;
    private IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> metricsImap;
    private ExecutorService executorService;
    private final SeaTunnelServer seaTunnelServer;
    private final ScheduledExecutorService masterActiveListener;
    private final EngineConfig engineConfig;
    private Map<Long, JobMaster> runningJobMasterMap = new ConcurrentHashMap();
    private volatile boolean isActive = false;

    public CoordinatorService(@NonNull NodeEngineImpl nodeEngineImpl, @NonNull SeaTunnelServer seaTunnelServer, EngineConfig engineConfig) {
        if (nodeEngineImpl == null) {
            throw new NullPointerException("nodeEngine is marked non-null but is null");
        }
        if (seaTunnelServer == null) {
            throw new NullPointerException("seaTunnelServer is marked non-null but is null");
        }
        this.nodeEngine = nodeEngineImpl;
        this.logger = nodeEngineImpl.getLogger(getClass());
        this.executorService = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("seatunnel-coordinator-service-%d").build());
        this.seaTunnelServer = seaTunnelServer;
        this.engineConfig = engineConfig;
        this.masterActiveListener = Executors.newSingleThreadScheduledExecutor();
        this.masterActiveListener.scheduleAtFixedRate(this::checkNewActiveMaster, 0L, 100L, TimeUnit.MILLISECONDS);
    }

    public JobHistoryService getJobHistoryService() {
        return this.jobHistoryService;
    }

    public JobMaster getJobMaster(Long l) {
        return this.runningJobMasterMap.get(l);
    }

    private void initCoordinatorService() {
        this.runningJobInfoIMap = this.nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_INFO);
        this.runningJobStateIMap = this.nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_STATE);
        this.runningJobStateTimestampsIMap = this.nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_STATE_TIMESTAMPS);
        this.ownedSlotProfilesIMap = this.nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_OWNED_SLOT_PROFILES);
        this.metricsImap = this.nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
        this.jobHistoryService = new JobHistoryService(this.runningJobStateIMap, this.logger, this.runningJobMasterMap, this.nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_FINISHED_JOB_STATE), this.nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_FINISHED_JOB_METRICS), this.nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_FINISHED_JOB_VERTEX_INFO));
        try {
            CompletableFuture.allOf((CompletableFuture[]) ((List) this.runningJobInfoIMap.entrySet().stream().map(entry -> {
                return CompletableFuture.runAsync(() -> {
                    this.logger.info(String.format("begin restore job (%s) from master active switch", entry.getKey()));
                    try {
                        restoreJobFromMasterActiveSwitch((Long) entry.getKey(), (JobInfo) entry.getValue());
                    } catch (Exception e) {
                        this.logger.severe(e);
                    }
                    this.logger.info(String.format("restore job (%s) from master active switch finished", entry.getKey()));
                }, this.executorService);
            }).collect(Collectors.toList())).toArray(new CompletableFuture[0])).get();
        } catch (Exception e) {
            throw new SeaTunnelEngineException(e);
        }
    }

    private void restoreJobFromMasterActiveSwitch(@NonNull Long l, @NonNull JobInfo jobInfo) {
        if (l == null) {
            throw new NullPointerException("jobId is marked non-null but is null");
        }
        if (jobInfo == null) {
            throw new NullPointerException("jobInfo is marked non-null but is null");
        }
        if (this.runningJobStateIMap.get(l) == null) {
            this.runningJobInfoIMap.remove(l);
            return;
        }
        JobStatus jobStatus = (JobStatus) this.runningJobStateIMap.get(l);
        JobMaster jobMaster = new JobMaster(jobInfo.getJobImmutableInformation(), this.nodeEngine, this.executorService, getResourceManager(), getJobHistoryService(), this.runningJobStateIMap, this.runningJobStateTimestampsIMap, this.ownedSlotProfilesIMap, this.runningJobInfoIMap, this.metricsImap, this.engineConfig);
        try {
            jobMaster.init(this.runningJobInfoIMap.get(l).getInitializationTimestamp().longValue(), true, !JobStatus.CANCELLING.equals(jobStatus));
            String jobFullName = jobMaster.getPhysicalPlan().getJobFullName();
            if (jobStatus.isEndState()) {
                this.logger.info(String.format("The restore %s is in an end state %s, store the job info to JobHistory and clear the job running time info", jobFullName, jobStatus));
                jobMaster.cleanJob();
                return;
            }
            if (jobStatus.ordinal() < JobStatus.RUNNING.ordinal()) {
                CompletableFuture.runAsync(() -> {
                    this.logger.info(String.format("The restore %s is state %s, cancel job and submit it again.", jobFullName, jobStatus));
                    jobMaster.cancelJob();
                    jobMaster.getJobMasterCompleteFuture().join();
                    submitJob(l.longValue(), jobInfo.getJobImmutableInformation()).join();
                }, this.executorService);
                return;
            }
            this.runningJobMasterMap.put(l, jobMaster);
            jobMaster.markRestore();
            if (JobStatus.CANCELLING.equals(jobStatus)) {
                this.logger.info(String.format("The restore %s is in %s state, cancel the job", jobFullName, jobStatus));
                CompletableFuture.runAsync(() -> {
                    try {
                        jobMaster.cancelJob();
                        jobMaster.run();
                    } finally {
                        if (!jobMaster.getJobMasterCompleteFuture().isCancelled()) {
                            this.runningJobMasterMap.remove(l);
                        }
                    }
                }, this.executorService);
            } else if (JobStatus.RUNNING.equals(jobStatus)) {
                this.logger.info(String.format("The restore %s is in %s state, restore pipeline and take over this job running", jobFullName, jobStatus));
                CompletableFuture.runAsync(() -> {
                    try {
                        jobMaster.getPhysicalPlan().getPipelineList().forEach((v0) -> {
                            v0.restorePipelineState();
                        });
                        jobMaster.run();
                    } finally {
                        if (!jobMaster.getJobMasterCompleteFuture().isCancelled()) {
                            this.runningJobMasterMap.remove(l);
                        }
                    }
                }, this.executorService);
            }
        } catch (Exception e) {
            throw new SeaTunnelEngineException(String.format("Job id %s init failed", l), e);
        }
    }

    private void checkNewActiveMaster() {
        try {
            if (!this.isActive && this.seaTunnelServer.isMasterNode()) {
                this.logger.info("This node become a new active master node, begin init coordinator service");
                if (this.executorService.isShutdown()) {
                    this.executorService = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("seatunnel-coordinator-service-%d").build());
                }
                initCoordinatorService();
                this.isActive = true;
            } else if (this.isActive && !this.seaTunnelServer.isMasterNode()) {
                this.isActive = false;
                this.logger.info("This node become leave active master node, begin clear coordinator service");
                clearCoordinatorService();
            }
        } catch (Exception e) {
            this.isActive = false;
            this.logger.severe(ExceptionUtils.getMessage(e));
            throw new SeaTunnelEngineException("check new active master error, stop loop", e);
        }
    }

    private void clearCoordinatorService() {
        this.runningJobMasterMap.values().forEach((v0) -> {
            v0.interrupt();
        });
        this.executorService.shutdownNow();
        try {
            this.executorService.awaitTermination(20L, TimeUnit.SECONDS);
            if (this.resourceManager != null) {
                this.resourceManager.close();
            }
        } catch (InterruptedException e) {
            throw new SeaTunnelEngineException("wait clean executor service error", e);
        }
    }

    public ResourceManager getResourceManager() {
        if (this.resourceManager == null) {
            synchronized (this) {
                if (this.resourceManager == null) {
                    ResourceManager resourceManager = new ResourceManagerFactory(this.nodeEngine).getResourceManager();
                    resourceManager.init();
                    this.resourceManager = resourceManager;
                }
            }
        }
        return this.resourceManager;
    }

    public PassiveCompletableFuture<Void> submitJob(long j, Data data) {
        CompletableFuture completableFuture = new CompletableFuture();
        JobMaster jobMaster = new JobMaster(data, this.nodeEngine, this.executorService, getResourceManager(), getJobHistoryService(), this.runningJobStateIMap, this.runningJobStateTimestampsIMap, this.ownedSlotProfilesIMap, this.runningJobInfoIMap, this.metricsImap, this.engineConfig);
        this.executorService.submit(() -> {
            try {
                this.runningJobInfoIMap.put(Long.valueOf(j), new JobInfo(Long.valueOf(System.currentTimeMillis()), data));
                this.runningJobMasterMap.put(Long.valueOf(j), jobMaster);
                jobMaster.init(this.runningJobInfoIMap.get(Long.valueOf(j)).getInitializationTimestamp().longValue(), false, true);
                completableFuture.complete(null);
            } catch (Throwable th) {
                this.logger.severe(String.format("submit job %s error %s ", Long.valueOf(j), ExceptionUtils.getMessage(th)));
                completableFuture.completeExceptionally(th);
            }
            if (completableFuture.isCompletedExceptionally()) {
                this.runningJobInfoIMap.remove(Long.valueOf(j));
                this.runningJobMasterMap.remove(Long.valueOf(j));
                return;
            }
            try {
                jobMaster.run();
                if (jobMaster.getJobMasterCompleteFuture().isCancelled()) {
                    return;
                }
                this.runningJobMasterMap.remove(Long.valueOf(j));
            } catch (Throwable th2) {
                if (!jobMaster.getJobMasterCompleteFuture().isCancelled()) {
                    this.runningJobMasterMap.remove(Long.valueOf(j));
                }
                throw th2;
            }
        });
        return new PassiveCompletableFuture<>(completableFuture);
    }

    public PassiveCompletableFuture<Void> savePoint(long j) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        if (this.runningJobMasterMap.containsKey(Long.valueOf(j))) {
            completableFuture = this.runningJobMasterMap.get(Long.valueOf(j)).savePoint();
        } else {
            Throwable th = new Throwable("The jobId: " + j + "of savePoint does not exist");
            this.logger.warning(th);
            completableFuture.completeExceptionally(th);
        }
        return new PassiveCompletableFuture<>(completableFuture);
    }

    public PassiveCompletableFuture<JobResult> waitForJobComplete(long j) {
        JobMaster jobMaster = this.runningJobMasterMap.get(Long.valueOf(j));
        if (jobMaster != null) {
            return new PassiveCompletableFuture<>(jobMaster.getJobMasterCompleteFuture());
        }
        JobStatus jobStatus = this.jobHistoryService.getJobDetailState(Long.valueOf(j)).getJobStatus();
        CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.complete(new JobResult(jobStatus, null));
        return new PassiveCompletableFuture<>(completableFuture);
    }

    public PassiveCompletableFuture<Void> cancelJob(long j) {
        JobMaster jobMaster = this.runningJobMasterMap.get(Long.valueOf(j));
        if (jobMaster != null) {
            return new PassiveCompletableFuture<>(CompletableFuture.supplyAsync(() -> {
                jobMaster.cancelJob();
                return null;
            }, this.executorService));
        }
        CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.complete(null);
        return new PassiveCompletableFuture<>(completableFuture);
    }

    public JobStatus getJobStatus(long j) {
        JobMaster jobMaster = this.runningJobMasterMap.get(Long.valueOf(j));
        if (jobMaster != null) {
            return jobMaster.getJobStatus();
        }
        JobHistoryService.JobState jobDetailState = this.jobHistoryService.getJobDetailState(Long.valueOf(j));
        if (null == jobDetailState) {
            return null;
        }
        return jobDetailState.getJobStatus();
    }

    public JobMetrics getJobMetrics(long j) {
        JobMaster jobMaster = this.runningJobMasterMap.get(Long.valueOf(j));
        if (jobMaster == null) {
            return this.jobHistoryService.getJobMetrics(Long.valueOf(j));
        }
        JobMetrics jobMetrics = JobMetricsUtil.toJobMetrics(jobMaster.getCurrJobMetrics());
        JobMetrics jobMetrics2 = this.jobHistoryService.getJobMetrics(Long.valueOf(j));
        return jobMetrics2 != null ? jobMetrics2.merge(jobMetrics) : jobMetrics;
    }

    public JobDAGInfo getJobInfo(long j) {
        JobDAGInfo jobDAGInfo = this.jobHistoryService.getJobDAGInfo(Long.valueOf(j));
        return jobDAGInfo != null ? jobDAGInfo : this.runningJobMasterMap.get(Long.valueOf(j)).getJobDAGInfo();
    }

    public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
        this.logger.info(String.format("Received task end from execution %s, state %s", taskExecutionState.getTaskGroupLocation(), taskExecutionState.getExecutionState()));
        TaskGroupLocation taskGroupLocation = taskExecutionState.getTaskGroupLocation();
        JobMaster jobMaster = this.runningJobMasterMap.get(Long.valueOf(taskGroupLocation.getJobId()));
        if (jobMaster == null) {
            throw new JobNotFoundException(String.format("Job %s not running", Long.valueOf(taskGroupLocation.getJobId())));
        }
        jobMaster.updateTaskExecutionState(taskExecutionState);
    }

    public void shutdown() {
        if (this.masterActiveListener != null) {
            this.masterActiveListener.shutdownNow();
        }
        clearCoordinatorService();
    }

    public boolean isCoordinatorActive() {
        return this.isActive;
    }

    public void failedTaskOnMemberRemoved(MembershipServiceEvent membershipServiceEvent) {
        Address address = membershipServiceEvent.getMember().getAddress();
        this.runningJobMasterMap.forEach((l, jobMaster) -> {
            jobMaster.getPhysicalPlan().getPipelineList().forEach(subPlan -> {
                makeTasksFailed(subPlan.getCoordinatorVertexList(), address);
                makeTasksFailed(subPlan.getPhysicalVertexList(), address);
            });
        });
    }

    private void makeTasksFailed(@NonNull List<PhysicalVertex> list, @NonNull Address address) {
        if (list == null) {
            throw new NullPointerException("physicalVertexList is marked non-null but is null");
        }
        if (address == null) {
            throw new NullPointerException("lostAddress is marked non-null but is null");
        }
        list.forEach(physicalVertex -> {
            Address currentExecutionAddress = physicalVertex.getCurrentExecutionAddress();
            ExecutionState executionState = physicalVertex.getExecutionState();
            if (null == currentExecutionAddress || !currentExecutionAddress.equals(address)) {
                return;
            }
            if (executionState.equals(ExecutionState.DEPLOYING) || executionState.equals(ExecutionState.RUNNING) || executionState.equals(ExecutionState.CANCELING)) {
                TaskGroupLocation taskGroupLocation = physicalVertex.getTaskGroupLocation();
                physicalVertex.updateTaskExecutionState(new TaskExecutionState(taskGroupLocation, ExecutionState.FAILED, new JobException(String.format("The taskGroup(%s) deployed node(%s) offline", taskGroupLocation, address))));
            }
        });
    }

    public void memberRemoved(MembershipServiceEvent membershipServiceEvent) {
        if (isCoordinatorActive()) {
            getResourceManager().memberRemoved(membershipServiceEvent);
        }
        failedTaskOnMemberRemoved(membershipServiceEvent);
    }

    public void printExecutionInfo() {
        ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) this.executorService;
        this.logger.info(StringFormatUtils.formatTable("CoordinatorService Thread Pool Status", MetricDescriptorConstants.TCP_METRIC_ENDPOINT_MANAGER_ACTIVE_COUNT, Integer.valueOf(threadPoolExecutor.getActiveCount()), "corePoolSize", Integer.valueOf(threadPoolExecutor.getCorePoolSize()), MetricDescriptorConstants.EXECUTOR_METRIC_MANAGED_EXECUTOR_SERVICE_MAXIMUM_POOL_SIZE, Integer.valueOf(threadPoolExecutor.getMaximumPoolSize()), MetricDescriptorConstants.EXECUTOR_METRIC_MANAGED_EXECUTOR_SERVICE_POOL_SIZE, Integer.valueOf(threadPoolExecutor.getPoolSize()), MetricDescriptorConstants.NETWORKING_METRIC_NIO_THREAD_COMPLETED_TASK_COUNT, Long.valueOf(threadPoolExecutor.getCompletedTaskCount()), "taskCount", Long.valueOf(threadPoolExecutor.getTaskCount())));
    }

    public void printJobDetailInfo() {
        AtomicLong atomicLong = new AtomicLong();
        AtomicLong atomicLong2 = new AtomicLong();
        AtomicLong atomicLong3 = new AtomicLong();
        AtomicLong atomicLong4 = new AtomicLong();
        AtomicLong atomicLong5 = new AtomicLong();
        AtomicLong atomicLong6 = new AtomicLong();
        AtomicLong atomicLong7 = new AtomicLong();
        AtomicLong atomicLong8 = new AtomicLong();
        AtomicLong atomicLong9 = new AtomicLong();
        AtomicLong atomicLong10 = new AtomicLong();
        AtomicLong atomicLong11 = new AtomicLong();
        if (this.runningJobInfoIMap != null) {
            this.runningJobInfoIMap.keySet().forEach(l -> {
                if (this.runningJobStateIMap.get(l) != null) {
                    switch ((JobStatus) this.runningJobStateIMap.get(l)) {
                        case CREATED:
                            atomicLong.addAndGet(1L);
                            return;
                        case SCHEDULED:
                            atomicLong2.addAndGet(1L);
                            return;
                        case RUNNING:
                            atomicLong3.addAndGet(1L);
                            return;
                        case FAILING:
                            atomicLong4.addAndGet(1L);
                            return;
                        case FAILED:
                            atomicLong5.addAndGet(1L);
                            return;
                        case CANCELLING:
                            atomicLong6.addAndGet(1L);
                            return;
                        case CANCELED:
                            atomicLong7.addAndGet(1L);
                            return;
                        case FINISHED:
                            atomicLong8.addAndGet(1L);
                            return;
                        case RESTARTING:
                            atomicLong9.addAndGet(1L);
                            return;
                        case SUSPENDED:
                            atomicLong10.addAndGet(1L);
                            return;
                        case RECONCILING:
                            atomicLong11.addAndGet(1L);
                            return;
                        default:
                            return;
                    }
                }
            });
        }
        this.logger.info(StringFormatUtils.formatTable("Job info detail", "createdJobCount", atomicLong, "scheduledJobCount", atomicLong2, "runningJobCount", atomicLong3, "failingJobCount", atomicLong4, "failedJobCount", atomicLong5, "cancellingJobCount", atomicLong6, "canceledJobCount", atomicLong7, "finishedJobCount", atomicLong8, "restartingJobCount", atomicLong9, "suspendedJobCount", atomicLong10, "reconcilingJobCount", atomicLong11));
    }
}
