/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.client.deployment.executors;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.function.Function;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.client.deployment.executors.PipelineExecutorUtils;
import org.apache.flink.client.program.PerJobMiniClusterFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobStatusChangedListener;
import org.apache.flink.core.execution.JobStatusChangedListenerUtils;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.streaming.api.graph.ExecutionPlan;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class LocalExecutor
implements PipelineExecutor {
    private static final Logger LOG = LoggerFactory.getLogger(LocalExecutor.class);
    private final ExecutorService executorService = Executors.newFixedThreadPool(1, (ThreadFactory)new ExecutorThreadFactory("Flink-LocalExecutor-IO"));
    public static final String NAME = "local";
    private final Configuration configuration;
    private final Function<MiniClusterConfiguration, MiniCluster> miniClusterFactory;
    private final List<JobStatusChangedListener> jobStatusChangedListeners;

    public static LocalExecutor create(Configuration configuration) {
        return new LocalExecutor(configuration, MiniCluster::new);
    }

    public static LocalExecutor createWithFactory(Configuration configuration, Function<MiniClusterConfiguration, MiniCluster> miniClusterFactory) {
        return new LocalExecutor(configuration, miniClusterFactory);
    }

    private LocalExecutor(Configuration configuration, Function<MiniClusterConfiguration, MiniCluster> miniClusterFactory) {
        this.configuration = configuration;
        this.miniClusterFactory = miniClusterFactory;
        this.jobStatusChangedListeners = JobStatusChangedListenerUtils.createJobStatusChangedListeners((ClassLoader)Thread.currentThread().getContextClassLoader(), (Configuration)configuration, (Executor)this.executorService);
    }

    public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration, ClassLoader userCodeClassloader) throws Exception {
        Preconditions.checkNotNull((Object)pipeline);
        Preconditions.checkNotNull((Object)configuration);
        Configuration effectiveConfig = new Configuration();
        effectiveConfig.addAll(this.configuration);
        effectiveConfig.addAll(configuration);
        Preconditions.checkState((boolean)((Boolean)configuration.get(DeploymentOptions.ATTACHED)));
        StreamGraph streamGraph = PipelineExecutorUtils.getStreamGraph(pipeline, configuration);
        streamGraph.serializeUserDefinedInstances();
        return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, this.miniClusterFactory).submitJob((ExecutionPlan)streamGraph, userCodeClassloader).whenComplete((ignored, throwable) -> {
            if (throwable == null) {
                PipelineExecutorUtils.notifyJobStatusListeners(pipeline, (ExecutionPlan)streamGraph, this.jobStatusChangedListeners);
            } else {
                LOG.error("Failed to submit job graph to local mini cluster.", throwable);
            }
        });
    }
}

