/*
 * Decompiled with CFR 0.152.
 */
package org.datacleaner.spark;

import com.google.common.base.Strings;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.Writer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.metamodel.util.Action;
import org.apache.metamodel.util.FileHelper;
import org.apache.metamodel.util.HdfsResource;
import org.apache.metamodel.util.MutableRef;
import org.apache.spark.launcher.SparkLauncher;
import org.datacleaner.spark.Main;
import org.datacleaner.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ApplicationDriver {
    private static final Logger logger = LoggerFactory.getLogger(ApplicationDriver.class);
    private static final String PRIMARY_JAR_FILENAME_PREFIX = "DataCleaner-env-spark";
    private final String _hostname;
    private final int _port;
    private final String _jarDirectoryPath;
    private final String _sparkHome;

    public ApplicationDriver(String hostname, int port, String jarDirectoryPath) {
        this(hostname, port, jarDirectoryPath, ApplicationDriver.determineSparkHome());
    }

    private static String determineSparkHome() {
        String sparkHome = System.getProperty("SPARK_HOME");
        if (Strings.isNullOrEmpty((String)sparkHome)) {
            sparkHome = System.getenv("SPARK_HOME");
        }
        if (Strings.isNullOrEmpty((String)sparkHome)) {
            throw new IllegalStateException("Could not determine SPARK_HOME. Please set the environment variable, system property or provide it as a " + ApplicationDriver.class.getSimpleName() + " constructor argument");
        }
        return sparkHome;
    }

    public ApplicationDriver(String hostname, int port, String jarDirectoryPath, String sparkHome) {
        this._hostname = hostname;
        this._port = port;
        this._jarDirectoryPath = jarDirectoryPath;
        this._sparkHome = sparkHome;
    }

    public int launch(String configurationHdfsPath, String jobHdfsPath) throws Exception {
        File hadoopConfDir = this.createTemporaryHadoopConfDir();
        SparkLauncher sparkLauncher = this.createSparkLauncher(hadoopConfDir, configurationHdfsPath, jobHdfsPath);
        return this.launch(sparkLauncher);
    }

    public int launch(SparkLauncher sparkLauncher) throws Exception {
        Process process = sparkLauncher.launch();
        InputStream errorStream = process.getErrorStream();
        this.startLogger(errorStream);
        InputStream inputStream = process.getInputStream();
        this.startLogger(inputStream);
        return process.waitFor();
    }

    private void startLogger(final InputStream stream) {
        new Thread(){

            @Override
            public void run() {
                try (BufferedReader br = new BufferedReader(new InputStreamReader(stream));){
                    String line = br.readLine();
                    while (line != null) {
                        logger.info(line);
                        line = br.readLine();
                    }
                    br.close();
                }
                catch (Exception e) {
                    logger.warn("Logger thread failure: " + e.getMessage(), (Throwable)e);
                }
            }
        }.start();
    }

    public HdfsResource createResource(String hdfsPath) {
        return new HdfsResource(this._hostname, this._port, hdfsPath);
    }

    public SparkLauncher createSparkLauncher(File hadoopConfDir, String configurationHdfsPath, String jobHdfsPath) throws Exception {
        HashMap<String, String> env = new HashMap<String, String>();
        env.put("HADOOP_CONF_DIR", hadoopConfDir.getAbsolutePath());
        env.put("YARN_CONF_DIR", hadoopConfDir.getAbsolutePath());
        SparkLauncher sparkLauncher = new SparkLauncher(env);
        sparkLauncher.setSparkHome(this._sparkHome);
        sparkLauncher.setMaster("yarn-cluster");
        sparkLauncher.setAppName("DataCleaner");
        MutableRef primaryJar = new MutableRef();
        List<String> jars = this.buildJarFiles((MutableRef<String>)primaryJar);
        logger.info("Using JAR files: {}", jars);
        for (String jar : jars) {
            sparkLauncher.addJar(jar);
        }
        sparkLauncher.setMainClass(Main.class.getName());
        sparkLauncher.addAppArgs(new String[]{(String)primaryJar.get()});
        sparkLauncher.addAppArgs(new String[]{this.toHdfsPath(configurationHdfsPath)});
        sparkLauncher.addAppArgs(new String[]{this.toHdfsPath(jobHdfsPath)});
        return sparkLauncher;
    }

    private String toHdfsPath(String path) {
        if (path.startsWith("hdfs://")) {
            return path;
        }
        return "hdfs://" + this._hostname + ":" + this._port + path;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private List<String> buildJarFiles(MutableRef<String> primaryJarRef) throws IOException {
        ArrayList<String> list = new ArrayList<String>();
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://" + this._hostname + ":" + this._port);
        FileSystem fs = FileSystem.newInstance((Configuration)conf);
        try {
            Path directoryPath = new Path(this._jarDirectoryPath);
            RemoteIterator files = fs.listFiles(directoryPath, false);
            while (files.hasNext()) {
                LocatedFileStatus file = (LocatedFileStatus)files.next();
                Path path = file.getPath();
                String filename = path.getName();
                if (filename.startsWith(PRIMARY_JAR_FILENAME_PREFIX)) {
                    primaryJarRef.set((Object)path.toString());
                    continue;
                }
                list.add(path.toString());
            }
        }
        catch (Throwable throwable) {
            FileHelper.safeClose((Object[])new Object[]{fs});
            throw throwable;
        }
        FileHelper.safeClose((Object[])new Object[]{fs});
        if (primaryJarRef.get() == null) {
            throw new IllegalArgumentException("Failed to find primary jar (starting with 'DataCleaner-env-spark') in JAR file directory: " + this._jarDirectoryPath);
        }
        return list;
    }

    public File createTemporaryHadoopConfDir() throws IOException {
        File hadoopConfDir = new File(FileHelper.getTempDir(), "datacleaner_hadoop_conf_" + UUID.randomUUID().toString());
        hadoopConfDir.mkdirs();
        this.createTemporaryHadoopConfFile(hadoopConfDir, "core-site.xml", "core-site-template.xml");
        this.createTemporaryHadoopConfFile(hadoopConfDir, "yarn-site.xml", "yarn-site-template.xml");
        logger.debug("Created temporary Hadoop conf dir: {}", (Object)hadoopConfDir);
        return hadoopConfDir;
    }

    private void createTemporaryHadoopConfFile(File hadoopConfDir, String filename, String templateName) throws IOException {
        File coreSiteFile = new File(hadoopConfDir, filename);
        try (InputStream inputStream = this.getClass().getResourceAsStream(templateName);){
            BufferedReader reader = FileHelper.getBufferedReader((InputStream)inputStream, (String)"UTF-8");
            try (Writer writer = FileHelper.getWriter((File)coreSiteFile);){
                String line = reader.readLine();
                while (line != null) {
                    line = StringUtils.replaceAll((String)line, (String)"${HDFS_HOSTNAME}", (String)this._hostname);
                    line = StringUtils.replaceAll((String)line, (String)"${HDFS_PORT}", (String)(this._port + ""));
                    writer.write(line);
                    line = reader.readLine();
                }
                writer.flush();
            }
        }
    }

    public void copyFileToHdfs(File file, String hdfsPath) {
        this.copyFileToHdfs(file, hdfsPath, true);
    }

    public void copyFileToHdfs(final File file, String hdfsPath, boolean overwrite) {
        HdfsResource hdfsResource = this.createResource(hdfsPath);
        boolean exists = hdfsResource.isExists();
        if (!overwrite && exists) {
            logger.debug("Skipping file-copy to {} because file already exists", (Object)hdfsPath);
            return;
        }
        if (exists) {
            logger.info("Overwriting file on HDFS: {}", (Object)hdfsPath);
        } else {
            logger.debug("Copying file to HDFS: {}", (Object)hdfsPath);
        }
        hdfsResource.write((Action)new Action<OutputStream>(){

            public void run(OutputStream out) throws Exception {
                FileInputStream in = new FileInputStream(file);
                FileHelper.copy((InputStream)in, (OutputStream)out);
                in.close();
            }
        });
    }
}

