package org.apache.seatunnel.core.starter.seatunnel.command;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.instance.impl.HazelcastInstanceFactory;
import java.nio.file.Path;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.common.utils.StringFormatUtils;
import org.apache.seatunnel.core.starter.command.Command;
import org.apache.seatunnel.core.starter.enums.MasterType;
import org.apache.seatunnel.core.starter.exception.CommandExecuteException;
import org.apache.seatunnel.core.starter.seatunnel.args.ClientCommandArgs;
import org.apache.seatunnel.core.starter.utils.FileUtils;
import org.apache.seatunnel.engine.client.SeaTunnelClient;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
import org.apache.seatunnel.engine.client.job.JobMetricsRunner;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.class */
public class ClientExecuteCommand implements Command<ClientCommandArgs> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ClientExecuteCommand.class);
    private final ClientCommandArgs clientCommandArgs;
    private JobStatus jobStatus;
    private SeaTunnelClient engineClient;
    private HazelcastInstance instance;
    private ScheduledExecutorService executorService;

    public ClientExecuteCommand(ClientCommandArgs clientCommandArgs) {
        this.clientCommandArgs = clientCommandArgs;
    }

    @Override // org.apache.seatunnel.core.starter.command.Command
    public void execute() throws CommandExecuteException {
        JobMetricsRunner.JobMetricsSummary jobMetricsSummary = null;
        LocalDateTime now = LocalDateTime.now();
        LocalDateTime now2 = LocalDateTime.now();
        SeaTunnelConfig locateAndGetSeaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
        try {
            try {
                String clusterName = this.clientCommandArgs.getClusterName();
                if (this.clientCommandArgs.getMasterType().equals(MasterType.LOCAL)) {
                    clusterName = creatRandomClusterName(StringUtils.isNotEmpty(clusterName) ? clusterName : "seatunnel");
                    this.instance = createServerInLocal(clusterName, locateAndGetSeaTunnelConfig);
                }
                if (StringUtils.isNotEmpty(clusterName)) {
                    locateAndGetSeaTunnelConfig.getHazelcastConfig().setClusterName(clusterName);
                }
                ClientConfig locateAndGetClientConfig = ConfigProvider.locateAndGetClientConfig();
                if (StringUtils.isNotEmpty(clusterName)) {
                    locateAndGetClientConfig.setClusterName(clusterName);
                }
                this.engineClient = new SeaTunnelClient(locateAndGetClientConfig);
                if (this.clientCommandArgs.isListJob()) {
                    System.out.println(this.engineClient.getJobClient().listJobStatus(true));
                } else if (null != this.clientCommandArgs.getJobId()) {
                    System.out.println(this.engineClient.getJobClient().getJobDetailStatus(Long.valueOf(Long.parseLong(this.clientCommandArgs.getJobId()))));
                } else if (null != this.clientCommandArgs.getCancelJobId()) {
                    this.engineClient.getJobClient().cancelJob(Long.valueOf(Long.parseLong(this.clientCommandArgs.getCancelJobId())));
                } else if (null != this.clientCommandArgs.getMetricsJobId()) {
                    System.out.println(this.engineClient.getJobClient().getJobMetrics(Long.valueOf(Long.parseLong(this.clientCommandArgs.getMetricsJobId()))));
                } else if (null != this.clientCommandArgs.getSavePointJobId()) {
                    this.engineClient.getJobClient().savePointJob(Long.valueOf(Long.parseLong(this.clientCommandArgs.getSavePointJobId())));
                } else {
                    Path configPath = FileUtils.getConfigPath(this.clientCommandArgs);
                    FileUtils.checkConfigExist(configPath);
                    JobConfig jobConfig = new JobConfig();
                    jobConfig.setName(this.clientCommandArgs.getJobName());
                    JobExecutionEnvironment restoreExecutionContext = null != this.clientCommandArgs.getRestoreJobId() ? this.engineClient.restoreExecutionContext(configPath.toString(), jobConfig, Long.valueOf(Long.parseLong(this.clientCommandArgs.getRestoreJobId()))) : this.engineClient.createExecutionContext(configPath.toString(), jobConfig);
                    now = LocalDateTime.now();
                    ClientJobProxy execute = restoreExecutionContext.execute();
                    if (this.clientCommandArgs.isAsync()) {
                        if (!this.clientCommandArgs.getMasterType().equals(MasterType.LOCAL)) {
                            if (0 != 0) {
                                log.info(StringFormatUtils.formatTable("Job Statistic Information", "Start Time", DateTimeUtils.toString(now, DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS), "End Time", DateTimeUtils.toString(now2, DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS), "Total Time(s)", Long.valueOf(Duration.between(now, now2).getSeconds()), "Total Read Count", Long.valueOf(jobMetricsSummary.getSourceReadCount()), "Total Write Count", Long.valueOf(jobMetricsSummary.getSinkWriteCount()), "Total Failed Count", Long.valueOf(jobMetricsSummary.getSourceReadCount() - jobMetricsSummary.getSinkWriteCount())));
                            }
                            closeClient();
                            return;
                        }
                        log.warn("The job is running in local mode, can not use async mode.");
                    }
                    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                        try {
                            CompletableFuture.runAsync(() -> {
                                log.info("run shutdown hook because get close signal");
                                shutdownHook(execute);
                            }).get(15L, TimeUnit.SECONDS);
                        } catch (Exception e) {
                            log.error("Cancel job failed.", (Throwable) e);
                        }
                    }));
                    long jobId = execute.getJobId();
                    JobMetricsRunner jobMetricsRunner = new JobMetricsRunner(this.engineClient, Long.valueOf(jobId));
                    this.executorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("job-metrics-runner-%d").setDaemon(true).build());
                    this.executorService.scheduleAtFixedRate(jobMetricsRunner, 0L, locateAndGetSeaTunnelConfig.getEngineConfig().getPrintJobMetricsInfoInterval(), TimeUnit.SECONDS);
                    this.jobStatus = execute.waitForJobComplete();
                    now2 = LocalDateTime.now();
                    jobMetricsSummary = this.engineClient.getJobMetricsSummary(Long.valueOf(jobId));
                }
                if (jobMetricsSummary != null) {
                    log.info(StringFormatUtils.formatTable("Job Statistic Information", "Start Time", DateTimeUtils.toString(now, DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS), "End Time", DateTimeUtils.toString(now2, DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS), "Total Time(s)", Long.valueOf(Duration.between(now, now2).getSeconds()), "Total Read Count", Long.valueOf(jobMetricsSummary.getSourceReadCount()), "Total Write Count", Long.valueOf(jobMetricsSummary.getSinkWriteCount()), "Total Failed Count", Long.valueOf(jobMetricsSummary.getSourceReadCount() - jobMetricsSummary.getSinkWriteCount())));
                }
                closeClient();
            } catch (Exception e) {
                throw new CommandExecuteException("SeaTunnel job executed failed", e);
            }
        } catch (Throwable th) {
            if (0 != 0) {
                log.info(StringFormatUtils.formatTable("Job Statistic Information", "Start Time", DateTimeUtils.toString(now, DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS), "End Time", DateTimeUtils.toString(now2, DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS), "Total Time(s)", Long.valueOf(Duration.between(now, now2).getSeconds()), "Total Read Count", Long.valueOf(jobMetricsSummary.getSourceReadCount()), "Total Write Count", Long.valueOf(jobMetricsSummary.getSinkWriteCount()), "Total Failed Count", Long.valueOf(jobMetricsSummary.getSourceReadCount() - jobMetricsSummary.getSinkWriteCount())));
            }
            closeClient();
            throw th;
        }
    }

    private void closeClient() {
        if (this.engineClient != null) {
            this.engineClient.close();
            log.info("Closed SeaTunnel client......");
        }
        if (this.instance != null) {
            this.instance.shutdown();
            log.info("Closed HazelcastInstance ......");
        }
        if (this.executorService != null) {
            this.executorService.shutdownNow();
            log.info("Closed metrics executor service ......");
        }
    }

    private HazelcastInstance createServerInLocal(String str, SeaTunnelConfig seaTunnelConfig) {
        seaTunnelConfig.getHazelcastConfig().setClusterName(str);
        seaTunnelConfig.getHazelcastConfig().getNetworkConfig().setPortAutoIncrement(true);
        return HazelcastInstanceFactory.newHazelcastInstance(seaTunnelConfig.getHazelcastConfig(), Thread.currentThread().getName(), new SeaTunnelNodeContext(seaTunnelConfig));
    }

    private String creatRandomClusterName(String str) {
        return str + "-" + new Random().nextInt(1000000);
    }

    private void shutdownHook(ClientJobProxy clientJobProxy) {
        if (this.clientCommandArgs.isCloseJob() && clientJobProxy.getJobResultCache() == null) {
            if (this.jobStatus == null || !this.jobStatus.isEndState()) {
                log.warn("Task will be closed due to client shutdown.");
                clientJobProxy.cancelJob();
            }
        }
    }
}
