package org.apache.flink.kubernetes.artifact;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.client.cli.ArtifactFetchOptions;
import org.apache.flink.client.program.PackagedProgramUtils;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.function.FunctionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/artifact/DefaultKubernetesArtifactUploader.class */
public class DefaultKubernetesArtifactUploader implements KubernetesArtifactUploader {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultKubernetesArtifactUploader.class);

    @Override // org.apache.flink.kubernetes.artifact.KubernetesArtifactUploader
    public void uploadAll(Configuration configuration) throws Exception {
        if (!((Boolean) configuration.get(KubernetesConfigOptions.LOCAL_UPLOAD_ENABLED)).booleanValue()) {
            LOG.info("Local artifact uploading is disabled. Set '{}' to enable.", KubernetesConfigOptions.LOCAL_UPLOAD_ENABLED.key());
            return;
        }
        updateConfig(configuration, PipelineOptions.JARS, Collections.singletonList(upload(configuration, getJobUri(configuration))));
        updateConfig(configuration, ArtifactFetchOptions.ARTIFACT_LIST, (List) ((List) configuration.getOptional(ArtifactFetchOptions.ARTIFACT_LIST).orElse(Collections.emptyList())).stream().map(FunctionUtils.uncheckedFunction(str -> {
            return upload(configuration, str);
        })).collect(Collectors.toList()));
    }

    @VisibleForTesting
    String upload(Configuration configuration, String str) throws IOException, URISyntaxException {
        URI resolveURI = PackagedProgramUtils.resolveURI(str);
        if (!"local".equals(resolveURI.getScheme())) {
            return str;
        }
        String str2 = (String) configuration.get(KubernetesConfigOptions.LOCAL_UPLOAD_TARGET);
        Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(str2), String.format("Setting '%s' to a valid remote path is required.", KubernetesConfigOptions.LOCAL_UPLOAD_TARGET.key()));
        FileSystem.WriteMode writeMode = ((Boolean) configuration.get(KubernetesConfigOptions.LOCAL_UPLOAD_OVERWRITE)).booleanValue() ? FileSystem.WriteMode.OVERWRITE : FileSystem.WriteMode.NO_OVERWRITE;
        File file = new File(resolveURI.getPath());
        Path path = new Path(str2, file.getName());
        if (path.getFileSystem().exists(path) && writeMode == FileSystem.WriteMode.NO_OVERWRITE) {
            LOG.info("Skip uploading artifact '{}', as it already exists. To overwrite existing artifacts, please set the '{}' config option.", path, KubernetesConfigOptions.LOCAL_UPLOAD_OVERWRITE.key());
        } else {
            long currentTimeMillis = System.currentTimeMillis();
            FSDataOutputStream create = path.getFileSystem().create(path, writeMode);
            try {
                FileUtils.copyFile(file, create);
                if (create != null) {
                    create.close();
                }
                LOG.debug("Copied file from {} to {}, cost {} ms", new Object[]{file, path, Long.valueOf(System.currentTimeMillis() - currentTimeMillis)});
            } catch (Throwable th) {
                if (create != null) {
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        return path.toString();
    }

    @VisibleForTesting
    void updateConfig(Configuration configuration, ConfigOption<List<String>> configOption, List<String> list) {
        if (hasLocal((List) configuration.getOptional(configOption).orElse(Collections.emptyList()))) {
            LOG.info("Updating configuration '{}' after to replace local artifact: '{}'", configOption.key(), list);
            configuration.set(configOption, list);
        }
    }

    private String getJobUri(Configuration configuration) {
        Preconditions.checkArgument(((List) configuration.get(PipelineOptions.JARS)).size() == 1, String.format("The '%s' config must contain one JAR.", PipelineOptions.JARS.key()));
        return (String) ((List) configuration.get(PipelineOptions.JARS)).get(0);
    }

    private boolean hasLocal(List<String> list) {
        return list.stream().anyMatch(str -> {
            return str.contains("local:/");
        });
    }
}
