package org.apache.flink.runtime.akka;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview;
import org.apache.flink.runtime.messages.webmonitor.RequestStatusOverview;
import org.apache.flink.runtime.metrics.dump.MetricQueryService;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import scala.Option;
import scala.reflect.ClassTag$;

/* loaded from: input_file:org/apache/flink/runtime/akka/AkkaJobManagerGateway.class */
public class AkkaJobManagerGateway implements JobManagerGateway {
    private final ActorGateway jobManagerGateway;
    private final String hostname;

    public AkkaJobManagerGateway(ActorGateway actorGateway) {
        this.jobManagerGateway = (ActorGateway) Preconditions.checkNotNull(actorGateway);
        Option host = actorGateway.actor().path().address().host();
        this.hostname = host.isDefined() ? (String) host.get() : "localhost";
    }

    @Override // org.apache.flink.runtime.rpc.RpcGateway
    public String getAddress() {
        return this.jobManagerGateway.path();
    }

    @Override // org.apache.flink.runtime.rpc.RpcGateway
    public String getHostname() {
        return this.hostname;
    }

    @Override // org.apache.flink.runtime.jobmaster.JobManagerGateway
    public CompletableFuture<Integer> requestBlobServerPort(Time time) {
        return FutureUtils.toJava(this.jobManagerGateway.ask(JobManagerMessages.getRequestBlobManagerPort(), FutureUtils.toFiniteDuration(time)).mapTo(ClassTag$.MODULE$.apply(Integer.class)));
    }

    @Override // org.apache.flink.runtime.jobmaster.JobManagerGateway
    public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, ListeningBehaviour listeningBehaviour, Time time) {
        return FutureUtils.toJava(this.jobManagerGateway.ask(new JobManagerMessages.SubmitJob(jobGraph, listeningBehaviour), FutureUtils.toFiniteDuration(time))).thenApply(obj -> {
            if (!(obj instanceof JobManagerMessages.JobSubmitSuccess)) {
                if (obj instanceof JobManagerMessages.JobResultFailure) {
                    throw new CompletionException((Throwable) new FlinkException("Job submission failed.", ((JobManagerMessages.JobResultFailure) obj).cause()));
                }
                throw new CompletionException((Throwable) new FlinkException("Unknown response to SubmitJob message: " + obj + '.'));
            }
            JobManagerMessages.JobSubmitSuccess jobSubmitSuccess = (JobManagerMessages.JobSubmitSuccess) obj;
            if (Objects.equals(jobSubmitSuccess.jobId(), jobGraph.getJobID())) {
                return Acknowledge.get();
            }
            throw new CompletionException((Throwable) new FlinkException("JobManager responded for wrong Job. This Job: " + jobGraph.getJobID() + ", response: " + jobSubmitSuccess.jobId()));
        });
    }

    @Override // org.apache.flink.runtime.jobmaster.JobManagerGateway
    public CompletableFuture<String> cancelJobWithSavepoint(JobID jobID, String str, Time time) {
        return FutureUtils.toJava(this.jobManagerGateway.ask(new JobManagerMessages.CancelJobWithSavepoint(jobID, str), FutureUtils.toFiniteDuration(time)).mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.CancellationResponse.class))).thenApply(cancellationResponse -> {
            if (cancellationResponse instanceof JobManagerMessages.CancellationSuccess) {
                return ((JobManagerMessages.CancellationSuccess) cancellationResponse).savepointPath();
            }
            throw new CompletionException((Throwable) new FlinkException("Cancel with savepoint failed.", ((JobManagerMessages.CancellationFailure) cancellationResponse).cause()));
        });
    }

    @Override // org.apache.flink.runtime.jobmaster.JobManagerGateway
    public CompletableFuture<Acknowledge> cancelJob(JobID jobID, Time time) {
        return FutureUtils.toJava(this.jobManagerGateway.ask(new JobManagerMessages.CancelJob(jobID), FutureUtils.toFiniteDuration(time)).mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.CancellationResponse.class))).thenApply(cancellationResponse -> {
            if (cancellationResponse instanceof JobManagerMessages.CancellationSuccess) {
                return Acknowledge.get();
            }
            throw new CompletionException((Throwable) new FlinkException("Cancel job failed " + jobID + '.', ((JobManagerMessages.CancellationFailure) cancellationResponse).cause()));
        });
    }

    @Override // org.apache.flink.runtime.jobmaster.JobManagerGateway
    public CompletableFuture<Acknowledge> stopJob(JobID jobID, Time time) {
        return FutureUtils.toJava(this.jobManagerGateway.ask(new JobManagerMessages.StopJob(jobID), FutureUtils.toFiniteDuration(time)).mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.StoppingResponse.class))).thenApply(stoppingResponse -> {
            if (stoppingResponse instanceof JobManagerMessages.StoppingSuccess) {
                return Acknowledge.get();
            }
            throw new CompletionException((Throwable) new FlinkException("Stop job failed " + jobID + '.', ((JobManagerMessages.StoppingFailure) stoppingResponse).cause()));
        });
    }

    @Override // org.apache.flink.runtime.jobmaster.JobManagerGateway
    public CompletableFuture<Optional<Instance>> requestTaskManagerInstance(ResourceID resourceID, Time time) {
        return FutureUtils.toJava(this.jobManagerGateway.ask(new JobManagerMessages.RequestTaskManagerInstance(resourceID), FutureUtils.toFiniteDuration(time)).mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.TaskManagerInstance.class))).thenApply(taskManagerInstance -> {
            return taskManagerInstance.instance().isDefined() ? Optional.of(taskManagerInstance.instance().get()) : Optional.empty();
        });
    }

    @Override // org.apache.flink.runtime.jobmaster.JobManagerGateway
    public CompletableFuture<Collection<Instance>> requestTaskManagerInstances(Time time) {
        return FutureUtils.toJava(this.jobManagerGateway.ask(JobManagerMessages.getRequestRegisteredTaskManagers(), FutureUtils.toFiniteDuration(time)).mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.RegisteredTaskManagers.class))).thenApply((v0) -> {
            return v0.asJavaCollection();
        });
    }

    @Override // org.apache.flink.runtime.jobmaster.JobManagerGateway
    public CompletableFuture<Optional<JobManagerMessages.ClassloadingProps>> requestClassloadingProps(JobID jobID, Time time) {
        return FutureUtils.toJava(this.jobManagerGateway.ask(new JobManagerMessages.RequestClassloadingProps(jobID), FutureUtils.toFiniteDuration(time))).thenApply(obj -> {
            if (obj instanceof JobManagerMessages.ClassloadingProps) {
                return Optional.of((JobManagerMessages.ClassloadingProps) obj);
            }
            if (obj instanceof JobManagerMessages.JobNotFound) {
                return Optional.empty();
            }
            throw new CompletionException((Throwable) new FlinkException("Unknown response: " + obj + '.'));
        });
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<MultipleJobsDetails> requestJobDetails(boolean z, boolean z2, Time time) {
        return FutureUtils.toJava(this.jobManagerGateway.ask(new RequestJobDetails(true, true), FutureUtils.toFiniteDuration(time)).mapTo(ClassTag$.MODULE$.apply(MultipleJobsDetails.class)));
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<AccessExecutionGraph> requestJob(JobID jobID, Time time) {
        return FutureUtils.toJava(this.jobManagerGateway.ask(new JobManagerMessages.RequestJob(jobID), FutureUtils.toFiniteDuration(time)).mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.JobResponse.class))).thenApply(jobResponse -> {
            if (jobResponse instanceof JobManagerMessages.JobFound) {
                return ((JobManagerMessages.JobFound) jobResponse).executionGraph();
            }
            throw new CompletionException((Throwable) new FlinkJobNotFoundException(jobID));
        });
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<ClusterOverview> requestClusterOverview(Time time) {
        return FutureUtils.toJava(this.jobManagerGateway.ask(RequestStatusOverview.getInstance(), FutureUtils.toFiniteDuration(time)).mapTo(ClassTag$.MODULE$.apply(ClusterOverview.class)));
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<Collection<String>> requestMetricQueryServicePaths(Time time) {
        String address = getAddress();
        return CompletableFuture.completedFuture(Collections.singleton(address.substring(0, address.lastIndexOf(47) + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME));
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServicePaths(Time time) {
        return requestTaskManagerInstances(time).thenApply(collection -> {
            return (List) collection.stream().map(instance -> {
                String address = instance.getTaskManagerGateway().getAddress();
                return Tuple2.of(instance.getTaskManagerID(), address.substring(0, address.lastIndexOf(47) + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME + '_' + instance.getTaskManagerID().getResourceIdString());
            }).collect(Collectors.toList());
        });
    }

    @Override // org.apache.flink.runtime.jobmaster.JobManagerGateway
    public CompletableFuture<JobsWithIDsOverview> requestJobsOverview(Time time) {
        return FutureUtils.toJava(this.jobManagerGateway.ask(RequestJobsWithIDsOverview.getInstance(), FutureUtils.toFiniteDuration(time)).mapTo(ClassTag$.MODULE$.apply(JobsWithIDsOverview.class)));
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<String> requestRestAddress(Time time) {
        return FutureUtils.toJava(this.jobManagerGateway.ask(JobManagerMessages.getRequestRestAddress(), FutureUtils.toFiniteDuration(time)).mapTo(ClassTag$.MODULE$.apply(String.class)));
    }
}
