/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.util;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Stream;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.client.deployment.executors.PipelineExecutorUtils;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.WritableConfig;
import org.apache.flink.core.execution.CacheSupportedPipelineExecutor;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobStatusChangedListener;
import org.apache.flink.core.execution.JobStatusChangedListenerUtils;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.core.execution.PipelineExecutorFactory;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterJobClient;
import org.apache.flink.streaming.api.graph.ExecutionPlan;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MiniClusterPipelineExecutorServiceLoader
implements PipelineExecutorServiceLoader {
    private static final Logger LOG = LoggerFactory.getLogger(MiniClusterPipelineExecutorServiceLoader.class);
    public static final String NAME = "minicluster";
    private final MiniCluster miniCluster;

    public MiniClusterPipelineExecutorServiceLoader(MiniCluster miniCluster) {
        this.miniCluster = miniCluster;
    }

    public static Configuration updateConfigurationForMiniCluster(Configuration config, Collection<Path> jarFiles, Collection<URL> classPaths) {
        MiniClusterPipelineExecutorServiceLoader.checkOverridesOption(config, PipelineOptions.JARS);
        MiniClusterPipelineExecutorServiceLoader.checkOverridesOption(config, PipelineOptions.CLASSPATHS);
        MiniClusterPipelineExecutorServiceLoader.checkOverridesOption(config, DeploymentOptions.TARGET);
        MiniClusterPipelineExecutorServiceLoader.checkOverridesOption(config, DeploymentOptions.ATTACHED);
        ConfigUtils.encodeCollectionToConfig((WritableConfig)config, (ConfigOption)PipelineOptions.JARS, jarFiles, MiniClusterPipelineExecutorServiceLoader::getAbsoluteURL);
        ConfigUtils.encodeCollectionToConfig((WritableConfig)config, (ConfigOption)PipelineOptions.CLASSPATHS, classPaths, URL::toString);
        config.set(DeploymentOptions.TARGET, (Object)NAME);
        config.set(DeploymentOptions.ATTACHED, (Object)true);
        return config;
    }

    private static void checkOverridesOption(Configuration config, ConfigOption<?> option) {
        if (config.contains(option)) {
            LOG.warn("Overriding config setting '{}' for MiniCluster.", (Object)option.key());
        }
    }

    private static String getAbsoluteURL(Path path) {
        FileSystem fs;
        try {
            fs = path.getFileSystem();
        }
        catch (IOException e) {
            throw new RuntimeException(String.format("Could not get FileSystem from %s", path), e);
        }
        try {
            return path.makeQualified(fs).toUri().toURL().toString();
        }
        catch (MalformedURLException e) {
            throw new RuntimeException(String.format("Could not get URL from %s", path), e);
        }
    }

    public PipelineExecutorFactory getExecutorFactory(Configuration configuration) {
        return new MiniClusterPipelineExecutorFactory(this.miniCluster);
    }

    public Stream<String> getExecutorNames() {
        return Stream.of(NAME);
    }

    private static class MiniClusterExecutor
    implements CacheSupportedPipelineExecutor {
        private final ExecutorService executorService = Executors.newFixedThreadPool(1, (ThreadFactory)new ExecutorThreadFactory("Flink-MiniClusterExecutor-IO"));
        private final MiniCluster miniCluster;
        private final List<JobStatusChangedListener> jobStatusChangedListeners;

        public MiniClusterExecutor(MiniCluster miniCluster) {
            this.miniCluster = miniCluster;
            this.jobStatusChangedListeners = JobStatusChangedListenerUtils.createJobStatusChangedListeners((ClassLoader)Thread.currentThread().getContextClassLoader(), (Configuration)miniCluster.getConfiguration(), (Executor)this.executorService);
        }

        public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration, ClassLoader userCodeClassLoader) throws Exception {
            SavepointRestoreSettings savepointRestoreSettings = ((StreamGraph)pipeline).getSavepointRestoreSettings();
            StreamGraph streamGraph = PipelineExecutorUtils.getStreamGraph((Pipeline)pipeline, (Configuration)configuration);
            if (streamGraph.getSavepointRestoreSettings() == SavepointRestoreSettings.none()) {
                streamGraph.setSavepointRestoreSettings(savepointRestoreSettings);
            }
            return ((CompletableFuture)this.miniCluster.submitJob((ExecutionPlan)streamGraph).whenComplete((ignored, throwable) -> {
                if (throwable == null) {
                    PipelineExecutorUtils.notifyJobStatusListeners((Pipeline)pipeline, (ExecutionPlan)streamGraph, this.jobStatusChangedListeners);
                } else {
                    LOG.error("Failed to submit job graph to mini cluster.", throwable);
                }
            })).thenApply(result -> new MiniClusterJobClient(result.getJobID(), this.miniCluster, userCodeClassLoader, MiniClusterJobClient.JobFinalizationBehavior.NOTHING));
        }

        public CompletableFuture<Set<AbstractID>> listCompletedClusterDatasetIds(Configuration configuration, ClassLoader userCodeClassloader) throws Exception {
            return this.miniCluster.listCompletedClusterDatasetIds();
        }

        public CompletableFuture<Void> invalidateClusterDataset(AbstractID clusterDatasetId, Configuration configuration, ClassLoader userCodeClassloader) throws Exception {
            return this.miniCluster.invalidateClusterDataset(clusterDatasetId);
        }
    }

    private static class MiniClusterPipelineExecutorFactory
    implements PipelineExecutorFactory {
        private final MiniCluster miniCluster;

        public MiniClusterPipelineExecutorFactory(MiniCluster miniCluster) {
            this.miniCluster = miniCluster;
        }

        public String getName() {
            return MiniClusterPipelineExecutorServiceLoader.NAME;
        }

        public boolean isCompatibleWith(Configuration configuration) {
            return true;
        }

        public PipelineExecutor getExecutor(Configuration configuration) {
            return new MiniClusterExecutor(this.miniCluster);
        }
    }
}

