/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.dispatcher;

import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.dispatcher.Dispatcher;
import org.apache.flink.runtime.dispatcher.DispatcherBootstrapFactory;
import org.apache.flink.runtime.dispatcher.DispatcherId;
import org.apache.flink.runtime.dispatcher.DispatcherServices;
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;

public class MiniDispatcher
extends Dispatcher {
    private final ClusterEntrypoint.ExecutionMode executionMode;
    private boolean jobCancelled = false;

    public MiniDispatcher(RpcService rpcService, DispatcherId fencingToken, DispatcherServices dispatcherServices, JobGraph jobGraph, DispatcherBootstrapFactory dispatcherBootstrapFactory, ClusterEntrypoint.ExecutionMode executionMode) throws Exception {
        super(rpcService, fencingToken, Collections.singleton(jobGraph), dispatcherBootstrapFactory, dispatcherServices);
        this.executionMode = (ClusterEntrypoint.ExecutionMode)((Object)Preconditions.checkNotNull((Object)((Object)executionMode)));
    }

    @Override
    public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) {
        CompletableFuture<Acknowledge> acknowledgeCompletableFuture = super.submitJob(jobGraph, timeout);
        acknowledgeCompletableFuture.whenComplete((ignored, throwable) -> {
            if (throwable != null) {
                this.onFatalError(new FlinkException("Failed to submit job " + jobGraph.getJobID() + " in job mode.", throwable));
            }
        });
        return acknowledgeCompletableFuture;
    }

    @Override
    public CompletableFuture<JobResult> requestJobResult(JobID jobId, Time timeout) {
        CompletableFuture<JobResult> jobResultFuture = super.requestJobResult(jobId, timeout);
        if (this.executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) {
            jobResultFuture.thenAccept(result -> {
                ApplicationStatus status = result.getSerializedThrowable().isPresent() ? ApplicationStatus.FAILED : ApplicationStatus.SUCCEEDED;
                this.log.info("Shutting down cluster because someone retrieved the job result.");
                this.shutDownFuture.complete(status);
            });
        } else {
            this.log.info("Not shutting down cluster after someone retrieved the job result.");
        }
        return jobResultFuture;
    }

    @Override
    public CompletableFuture<Acknowledge> cancelJob(JobID jobId, Time timeout) {
        this.jobCancelled = true;
        return super.cancelJob(jobId, timeout);
    }

    @Override
    protected Dispatcher.CleanupJobState jobReachedTerminalState(ArchivedExecutionGraph archivedExecutionGraph) {
        Dispatcher.CleanupJobState cleanupHAState = super.jobReachedTerminalState(archivedExecutionGraph);
        if (this.jobCancelled || this.executionMode == ClusterEntrypoint.ExecutionMode.DETACHED) {
            this.log.info("Shutting down cluster with state {}, jobCancelled: {}, executionMode: {}", new Object[]{archivedExecutionGraph.getState(), this.jobCancelled, this.executionMode});
            this.shutDownFuture.complete(ApplicationStatus.fromJobStatus(archivedExecutionGraph.getState()));
        }
        return cleanupHAState;
    }
}

