package org.apache.flink.client.deployment.application;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.cli.ClientOptions;
import org.apache.flink.client.deployment.application.executors.EmbeddedExecutorServiceLoader;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.dispatcher.DispatcherBootstrap;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.class */
public class ApplicationDispatcherBootstrap implements DispatcherBootstrap {
    private static final Logger LOG = LoggerFactory.getLogger(ApplicationDispatcherBootstrap.class);
    public static final JobID ZERO_JOB_ID = new JobID(0, 0);
    private final PackagedProgram application;
    private final Collection<JobID> recoveredJobIds;
    private final Configuration configuration;
    private final FatalErrorHandler errorHandler;
    private final CompletableFuture<Void> applicationCompletionFuture;
    private final CompletableFuture<Acknowledge> clusterShutdownFuture;
    private ScheduledFuture<?> applicationExecutionTask;

    public ApplicationDispatcherBootstrap(PackagedProgram packagedProgram, Collection<JobID> collection, Configuration configuration, DispatcherGateway dispatcherGateway, ScheduledExecutor scheduledExecutor, FatalErrorHandler fatalErrorHandler) {
        this.configuration = (Configuration) Preconditions.checkNotNull(configuration);
        this.recoveredJobIds = (Collection) Preconditions.checkNotNull(collection);
        this.application = (PackagedProgram) Preconditions.checkNotNull(packagedProgram);
        this.errorHandler = (FatalErrorHandler) Preconditions.checkNotNull(fatalErrorHandler);
        this.applicationCompletionFuture = fixJobIdAndRunApplicationAsync(dispatcherGateway, scheduledExecutor);
        this.clusterShutdownFuture = runApplicationAndShutdownClusterAsync(dispatcherGateway);
    }

    public void stop() {
        if (this.applicationExecutionTask != null) {
            this.applicationExecutionTask.cancel(true);
        }
        if (this.applicationCompletionFuture != null) {
            this.applicationCompletionFuture.cancel(true);
        }
    }

    @VisibleForTesting
    ScheduledFuture<?> getApplicationExecutionFuture() {
        return this.applicationExecutionTask;
    }

    @VisibleForTesting
    CompletableFuture<Void> getApplicationCompletionFuture() {
        return this.applicationCompletionFuture;
    }

    @VisibleForTesting
    CompletableFuture<Acknowledge> getClusterShutdownFuture() {
        return this.clusterShutdownFuture;
    }

    private CompletableFuture<Acknowledge> runApplicationAndShutdownClusterAsync(DispatcherGateway dispatcherGateway) {
        return this.applicationCompletionFuture.handle((r8, th) -> {
            ApplicationStatus status;
            if (th == null) {
                LOG.info("Application completed SUCCESSFULLY");
                return dispatcherGateway.shutDownCluster(ApplicationStatus.SUCCEEDED);
            }
            Optional findThrowable = ExceptionUtils.findThrowable(th, UnsuccessfulExecutionException.class);
            if (findThrowable.isPresent() && ((status = ((UnsuccessfulExecutionException) findThrowable.get()).getStatus()) == ApplicationStatus.CANCELED || status == ApplicationStatus.FAILED)) {
                LOG.info("Application {}: ", status, th);
                return dispatcherGateway.shutDownCluster(status);
            }
            LOG.warn("Application failed unexpectedly: ", th);
            this.errorHandler.onFatalError(new FlinkException("Application failed unexpectedly.", th));
            return FutureUtils.completedExceptionally(th);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) Function.identity());
    }

    private CompletableFuture<Void> fixJobIdAndRunApplicationAsync(DispatcherGateway dispatcherGateway, ScheduledExecutor scheduledExecutor) {
        Optional optional = this.configuration.getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID);
        if (!HighAvailabilityMode.isHighAvailabilityModeActivated(this.configuration) && !optional.isPresent()) {
            return runApplicationAsync(dispatcherGateway, scheduledExecutor, false);
        }
        if (!optional.isPresent()) {
            this.configuration.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, ZERO_JOB_ID.toHexString());
        }
        return runApplicationAsync(dispatcherGateway, scheduledExecutor, true);
    }

    private CompletableFuture<Void> runApplicationAsync(DispatcherGateway dispatcherGateway, ScheduledExecutor scheduledExecutor, boolean z) {
        CompletableFuture completableFuture = new CompletableFuture();
        this.applicationExecutionTask = scheduledExecutor.schedule(() -> {
            runApplicationEntryPoint(completableFuture, dispatcherGateway, scheduledExecutor, z);
        }, 0L, TimeUnit.MILLISECONDS);
        return completableFuture.thenCompose(list -> {
            return getApplicationResult(dispatcherGateway, list, scheduledExecutor);
        });
    }

    private void runApplicationEntryPoint(CompletableFuture<List<JobID>> completableFuture, DispatcherGateway dispatcherGateway, ScheduledExecutor scheduledExecutor, boolean z) {
        try {
            ArrayList arrayList = new ArrayList(this.recoveredJobIds);
            ClientUtils.executeProgram(new EmbeddedExecutorServiceLoader(arrayList, dispatcherGateway, scheduledExecutor), this.configuration, this.application, z, true);
            if (arrayList.isEmpty()) {
                completableFuture.completeExceptionally(new ApplicationExecutionException("The application contains no execute() calls."));
            } else {
                completableFuture.complete(arrayList);
            }
        } catch (Throwable th) {
            completableFuture.completeExceptionally(new ApplicationExecutionException("Could not execute application.", th));
        }
    }

    private CompletableFuture<Void> getApplicationResult(DispatcherGateway dispatcherGateway, Collection<JobID> collection, ScheduledExecutor scheduledExecutor) {
        return FutureUtils.waitForAll((List) collection.stream().map(jobID -> {
            return unwrapJobResultException(getJobResult(dispatcherGateway, jobID, scheduledExecutor));
        }).collect(Collectors.toList()));
    }

    private CompletableFuture<JobResult> getJobResult(DispatcherGateway dispatcherGateway, JobID jobID, ScheduledExecutor scheduledExecutor) {
        return JobStatusPollingUtils.getJobResult(dispatcherGateway, jobID, scheduledExecutor, Time.milliseconds(((Duration) this.configuration.get(ClientOptions.CLIENT_TIMEOUT)).toMillis()), Time.milliseconds(((Duration) this.configuration.get(ClientOptions.CLIENT_RETRY_PERIOD)).toMillis()));
    }

    private CompletableFuture<JobResult> unwrapJobResultException(CompletableFuture<JobResult> completableFuture) {
        return completableFuture.thenApply(jobResult -> {
            if (jobResult.isSuccess()) {
                return jobResult;
            }
            throw new CompletionException((Throwable) UnsuccessfulExecutionException.fromJobResult(jobResult, this.application.getUserCodeClassLoader()));
        });
    }
}
