/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.client.deployment.application.executors;

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.cli.ClientOptions;
import org.apache.flink.client.deployment.application.executors.EmbeddedJobClientCreator;
import org.apache.flink.client.deployment.executors.PipelineExecutorUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobStatusChangedListener;
import org.apache.flink.core.execution.JobStatusChangedListenerUtils;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.streaming.api.graph.ExecutionPlan;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.function.FunctionUtils;
import org.apache.flink.util.function.SupplierWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class EmbeddedExecutor
implements PipelineExecutor {
    private static final Logger LOG = LoggerFactory.getLogger(EmbeddedExecutor.class);
    private final ExecutorService executorService = Executors.newFixedThreadPool(1, (ThreadFactory)new ExecutorThreadFactory("Flink-EmbeddedClusterExecutor-IO"));
    public static final String NAME = "embedded";
    private final Collection<JobID> submittedJobIds;
    private final DispatcherGateway dispatcherGateway;
    private final EmbeddedJobClientCreator jobClientCreator;
    private final List<JobStatusChangedListener> jobStatusChangedListeners;

    public EmbeddedExecutor(Collection<JobID> submittedJobIds, DispatcherGateway dispatcherGateway, Configuration configuration, EmbeddedJobClientCreator jobClientCreator) {
        this.submittedJobIds = (Collection)Preconditions.checkNotNull(submittedJobIds);
        this.dispatcherGateway = (DispatcherGateway)Preconditions.checkNotNull((Object)dispatcherGateway);
        this.jobClientCreator = (EmbeddedJobClientCreator)Preconditions.checkNotNull((Object)jobClientCreator);
        this.jobStatusChangedListeners = JobStatusChangedListenerUtils.createJobStatusChangedListeners((ClassLoader)Thread.currentThread().getContextClassLoader(), (Configuration)configuration, (Executor)this.executorService);
    }

    public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration, ClassLoader userCodeClassloader) throws Exception {
        Preconditions.checkNotNull((Object)pipeline);
        Preconditions.checkNotNull((Object)configuration);
        Optional<JobID> optJobId = configuration.getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID).map(JobID::fromHexString);
        if (optJobId.isPresent() && this.submittedJobIds.contains(optJobId.get())) {
            return this.getJobClientFuture(optJobId.get(), userCodeClassloader);
        }
        return this.submitAndGetJobClientFuture(pipeline, configuration, userCodeClassloader);
    }

    private CompletableFuture<JobClient> getJobClientFuture(JobID jobId, ClassLoader userCodeClassloader) {
        LOG.info("Job {} was recovered successfully.", (Object)jobId);
        return CompletableFuture.completedFuture(this.jobClientCreator.getJobClient(jobId, userCodeClassloader));
    }

    private CompletableFuture<JobClient> submitAndGetJobClientFuture(Pipeline pipeline, Configuration configuration, ClassLoader userCodeClassloader) throws Exception {
        Duration timeout = (Duration)configuration.get(ClientOptions.CLIENT_TIMEOUT);
        StreamGraph streamGraph = PipelineExecutorUtils.getStreamGraph(pipeline, configuration);
        JobID actualJobId = streamGraph.getJobID();
        this.submittedJobIds.add(actualJobId);
        LOG.info("Job {} is submitted.", (Object)actualJobId);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Effective Configuration: {}", (Object)configuration);
        }
        CompletableFuture<JobID> jobSubmissionFuture = EmbeddedExecutor.submitJob(configuration, this.dispatcherGateway, streamGraph, timeout);
        return ((CompletableFuture)((CompletableFuture)jobSubmissionFuture.thenApplyAsync(FunctionUtils.uncheckedFunction(jobId -> {
            ClientUtils.waitUntilJobInitializationFinished((SupplierWithException<JobStatus, Exception>)((SupplierWithException)() -> (JobStatus)this.dispatcherGateway.requestJobStatus(jobId, timeout).get()), (SupplierWithException<JobResult, Exception>)((SupplierWithException)() -> (JobResult)this.dispatcherGateway.requestJobResult(jobId, timeout).get()), userCodeClassloader);
            return jobId;
        }))).thenApplyAsync(jobID -> this.jobClientCreator.getJobClient(actualJobId, userCodeClassloader))).whenCompleteAsync((jobClient, throwable) -> {
            if (throwable == null) {
                PipelineExecutorUtils.notifyJobStatusListeners(pipeline, (ExecutionPlan)streamGraph, this.jobStatusChangedListeners);
            } else {
                LOG.error("Failed to submit job graph to application cluster", throwable);
            }
        });
    }

    private static CompletableFuture<JobID> submitJob(Configuration configuration, DispatcherGateway dispatcherGateway, StreamGraph streamGraph, Duration rpcTimeout) {
        Preconditions.checkNotNull((Object)streamGraph);
        LOG.info("Submitting Job with JobId={}.", (Object)streamGraph.getJobID());
        return ((CompletableFuture)((CompletableFuture)dispatcherGateway.getBlobServerPort(rpcTimeout).thenCombine((CompletionStage)dispatcherGateway.getBlobServerAddress(rpcTimeout), (blobServerPort, blobServerAddress) -> new InetSocketAddress(blobServerAddress.getHostName(), (int)blobServerPort))).thenCompose(blobServerAddress -> {
            try {
                org.apache.flink.runtime.client.ClientUtils.extractAndUploadExecutionPlanFiles((ExecutionPlan)streamGraph, () -> new BlobClient(blobServerAddress, configuration));
                streamGraph.serializeUserDefinedInstances();
            }
            catch (Exception e) {
                throw new CompletionException(e);
            }
            return dispatcherGateway.submitJob((ExecutionPlan)streamGraph, rpcTimeout);
        })).thenApply(ack -> streamGraph.getJobID());
    }
}

