/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.yarn;

import java.io.IOException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.TaskManagerOptionsInternal;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.externalresource.ExternalResourceUtils;
import org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.runtime.webmonitor.history.HistoryServerUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.yarn.RegisterApplicationMasterResponseReflector;
import org.apache.flink.yarn.TaskExecutorProcessSpecContainerResourcePriorityAdapter;
import org.apache.flink.yarn.Utils;
import org.apache.flink.yarn.YarnNodeManagerClientFactory;
import org.apache.flink.yarn.YarnResourceManagerClientFactory;
import org.apache.flink.yarn.YarnTaskExecutorRunner;
import org.apache.flink.yarn.YarnWorkerNode;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnResourceManagerDriverConfiguration;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;

public class YarnResourceManagerDriver
extends AbstractResourceManagerDriver<YarnWorkerNode> {
    static final String ENV_FLINK_NODE_ID = "_FLINK_NODE_ID";
    static final String ERROR_MESSAGE_ON_SHUTDOWN_REQUEST = "Received shutdown request from YARN ResourceManager.";
    private final YarnConfiguration yarnConfig;
    private final YarnResourceManagerDriverConfiguration configuration;
    private final int yarnHeartbeatIntervalMillis;
    private final int containerRequestHeartbeatIntervalMillis;
    private final Map<TaskExecutorProcessSpec, Queue<CompletableFuture<YarnWorkerNode>>> requestResourceFutures;
    private final RegisterApplicationMasterResponseReflector registerApplicationMasterResponseReflector;
    private final YarnResourceManagerClientFactory yarnResourceManagerClientFactory;
    private final YarnNodeManagerClientFactory yarnNodeManagerClientFactory;
    private AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient;
    private NMClientAsync nodeManagerClient;
    private TaskExecutorProcessSpecContainerResourcePriorityAdapter taskExecutorProcessSpecContainerResourcePriorityAdapter;

    public YarnResourceManagerDriver(Configuration flinkConfig, YarnResourceManagerDriverConfiguration configuration, YarnResourceManagerClientFactory yarnResourceManagerClientFactory, YarnNodeManagerClientFactory yarnNodeManagerClientFactory) {
        super(flinkConfig, GlobalConfiguration.loadConfiguration((String)configuration.getCurrentDir()));
        this.yarnConfig = Utils.getYarnAndHadoopConfiguration(flinkConfig);
        this.requestResourceFutures = new HashMap<TaskExecutorProcessSpec, Queue<CompletableFuture<YarnWorkerNode>>>();
        this.configuration = configuration;
        int yarnHeartbeatIntervalMS = flinkConfig.getInteger(YarnConfigOptions.HEARTBEAT_DELAY_SECONDS) * 1000;
        long yarnExpiryIntervalMS = this.yarnConfig.getLong("yarn.am.liveness-monitor.expiry-interval-ms", 600000L);
        if ((long)yarnHeartbeatIntervalMS >= yarnExpiryIntervalMS) {
            this.log.warn("The heartbeat interval of the Flink Application master ({}) is greater than YARN's expiry interval ({}). The application is likely to be killed by YARN.", (Object)yarnHeartbeatIntervalMS, (Object)yarnExpiryIntervalMS);
        }
        this.yarnHeartbeatIntervalMillis = yarnHeartbeatIntervalMS;
        this.containerRequestHeartbeatIntervalMillis = flinkConfig.getInteger(YarnConfigOptions.CONTAINER_REQUEST_HEARTBEAT_INTERVAL_MILLISECONDS);
        this.registerApplicationMasterResponseReflector = new RegisterApplicationMasterResponseReflector(this.log);
        this.yarnResourceManagerClientFactory = yarnResourceManagerClientFactory;
        this.yarnNodeManagerClientFactory = yarnNodeManagerClientFactory;
    }

    protected void initializeInternal() throws Exception {
        YarnContainerEventHandler yarnContainerEventHandler = new YarnContainerEventHandler();
        try {
            this.resourceManagerClient = this.yarnResourceManagerClientFactory.createResourceManagerClient(this.yarnHeartbeatIntervalMillis, yarnContainerEventHandler);
            this.resourceManagerClient.init((org.apache.hadoop.conf.Configuration)this.yarnConfig);
            this.resourceManagerClient.start();
            RegisterApplicationMasterResponse registerApplicationMasterResponse = this.registerApplicationMaster();
            this.getContainersFromPreviousAttempts(registerApplicationMasterResponse);
            this.taskExecutorProcessSpecContainerResourcePriorityAdapter = new TaskExecutorProcessSpecContainerResourcePriorityAdapter(registerApplicationMasterResponse.getMaximumResourceCapability(), ExternalResourceUtils.getExternalResourceConfigurationKeys((Configuration)this.flinkConfig, (String)"yarn.config-key"));
        }
        catch (Exception e) {
            throw new ResourceManagerException("Could not start resource manager client.", (Throwable)e);
        }
        this.nodeManagerClient = this.yarnNodeManagerClientFactory.createNodeManagerClient(yarnContainerEventHandler);
        this.nodeManagerClient.init((org.apache.hadoop.conf.Configuration)this.yarnConfig);
        this.nodeManagerClient.start();
    }

    public CompletableFuture<Void> terminate() {
        Exception exception = null;
        if (this.resourceManagerClient != null) {
            try {
                this.resourceManagerClient.stop();
            }
            catch (Exception e) {
                exception = e;
            }
        }
        if (this.nodeManagerClient != null) {
            try {
                this.nodeManagerClient.stop();
            }
            catch (Exception e) {
                exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
            }
        }
        return exception == null ? FutureUtils.completedVoidFuture() : FutureUtils.completedExceptionally((Throwable)exception);
    }

    public void deregisterApplication(ApplicationStatus finalStatus, @Nullable String optionalDiagnostics) {
        FinalApplicationStatus yarnStatus = this.getYarnStatus(finalStatus);
        this.log.info("Unregister application from the YARN Resource Manager with final status {}.", (Object)yarnStatus);
        Optional historyServerURL = HistoryServerUtils.getHistoryServerURL((Configuration)this.flinkConfig);
        String appTrackingUrl = historyServerURL.map(URL::toString).orElse("");
        try {
            this.resourceManagerClient.unregisterApplicationMaster(yarnStatus, optionalDiagnostics, appTrackingUrl);
        }
        catch (IOException | YarnException e) {
            this.log.error("Could not unregister the application master.", e);
        }
        Utils.deleteApplicationFiles(this.configuration.getYarnFiles());
    }

    public CompletableFuture<YarnWorkerNode> requestResource(TaskExecutorProcessSpec taskExecutorProcessSpec) {
        this.checkInitialized();
        CompletableFuture<YarnWorkerNode> requestResourceFuture = new CompletableFuture<YarnWorkerNode>();
        Optional<TaskExecutorProcessSpecContainerResourcePriorityAdapter.PriorityAndResource> priorityAndResourceOpt = this.taskExecutorProcessSpecContainerResourcePriorityAdapter.getPriorityAndResource(taskExecutorProcessSpec);
        if (!priorityAndResourceOpt.isPresent()) {
            requestResourceFuture.completeExceptionally((Throwable)new ResourceManagerException(String.format("Could not compute the container Resource from the given TaskExecutorProcessSpec %s. This usually indicates the requested resource is larger than Yarn's max container resource limit.", taskExecutorProcessSpec)));
        } else {
            Priority priority = priorityAndResourceOpt.get().getPriority();
            Resource resource = priorityAndResourceOpt.get().getResource();
            this.resourceManagerClient.addContainerRequest(YarnResourceManagerDriver.getContainerRequest(resource, priority));
            this.resourceManagerClient.setHeartbeatInterval(this.containerRequestHeartbeatIntervalMillis);
            this.requestResourceFutures.computeIfAbsent(taskExecutorProcessSpec, ignore -> new LinkedList()).add(requestResourceFuture);
            this.log.info("Requesting new TaskExecutor container with resource {}, priority {}.", (Object)taskExecutorProcessSpec, (Object)priority);
        }
        return requestResourceFuture;
    }

    public void releaseResource(YarnWorkerNode workerNode) {
        Container container = workerNode.getContainer();
        this.log.info("Stopping container {}.", (Object)workerNode.getResourceID().getStringWithMetadata());
        this.nodeManagerClient.stopContainerAsync(container.getId(), container.getNodeId());
        this.resourceManagerClient.releaseAssignedContainer(container.getId());
    }

    private void onContainersOfPriorityAllocated(Priority priority, List<Container> containers) {
        Optional<TaskExecutorProcessSpecContainerResourcePriorityAdapter.TaskExecutorProcessSpecAndResource> taskExecutorProcessSpecAndResourceOpt = this.taskExecutorProcessSpecContainerResourcePriorityAdapter.getTaskExecutorProcessSpecAndResource(priority);
        Preconditions.checkState((boolean)taskExecutorProcessSpecAndResourceOpt.isPresent(), (String)"Receive %s containers with unrecognized priority %s. This should not happen.", (Object[])new Object[]{containers.size(), priority.getPriority()});
        TaskExecutorProcessSpec taskExecutorProcessSpec = taskExecutorProcessSpecAndResourceOpt.get().getTaskExecutorProcessSpec();
        Resource resource = taskExecutorProcessSpecAndResourceOpt.get().getResource();
        Queue pendingRequestResourceFutures = this.requestResourceFutures.getOrDefault(taskExecutorProcessSpec, new LinkedList());
        this.log.info("Received {} containers with priority {}, {} pending container requests.", new Object[]{containers.size(), priority, pendingRequestResourceFutures.size()});
        Iterator<Container> containerIterator = containers.iterator();
        Iterator<AMRMClient.ContainerRequest> pendingContainerRequestIterator = this.getPendingRequestsAndCheckConsistency(priority, resource, pendingRequestResourceFutures.size()).iterator();
        int numAccepted = 0;
        while (containerIterator.hasNext() && pendingContainerRequestIterator.hasNext()) {
            Container container = containerIterator.next();
            AMRMClient.ContainerRequest pendingRequest = pendingContainerRequestIterator.next();
            ResourceID resourceId = YarnResourceManagerDriver.getContainerResourceId(container);
            CompletableFuture requestResourceFuture = (CompletableFuture)pendingRequestResourceFutures.poll();
            Preconditions.checkState((requestResourceFuture != null ? 1 : 0) != 0);
            if (pendingRequestResourceFutures.isEmpty()) {
                this.requestResourceFutures.remove(taskExecutorProcessSpec);
            }
            this.startTaskExecutorInContainerAsync(container, taskExecutorProcessSpec, resourceId, requestResourceFuture);
            this.removeContainerRequest(pendingRequest);
            ++numAccepted;
        }
        int numExcess = 0;
        while (containerIterator.hasNext()) {
            this.returnExcessContainer(containerIterator.next());
            ++numExcess;
        }
        this.log.info("Accepted {} requested containers, returned {} excess containers, {} pending container requests of resource {}.", new Object[]{numAccepted, numExcess, pendingRequestResourceFutures.size(), resource});
    }

    private int getNumRequestedNotAllocatedWorkers() {
        return this.requestResourceFutures.values().stream().mapToInt(Collection::size).sum();
    }

    private void removeContainerRequest(AMRMClient.ContainerRequest pendingContainerRequest) {
        this.log.info("Removing container request {}.", (Object)pendingContainerRequest);
        this.resourceManagerClient.removeContainerRequest(pendingContainerRequest);
    }

    private void returnExcessContainer(Container excessContainer) {
        this.log.info("Returning excess container {}.", (Object)excessContainer.getId());
        this.resourceManagerClient.releaseAssignedContainer(excessContainer.getId());
    }

    private void startTaskExecutorInContainerAsync(Container container, TaskExecutorProcessSpec taskExecutorProcessSpec, ResourceID resourceId, CompletableFuture<YarnWorkerNode> requestResourceFuture) {
        CompletableFuture containerLaunchContextFuture = FutureUtils.supplyAsync(() -> this.createTaskExecutorLaunchContext(resourceId, container.getNodeId().getHost(), taskExecutorProcessSpec), (Executor)this.getIoExecutor());
        FutureUtils.assertNoException((CompletableFuture)containerLaunchContextFuture.handleAsync((context, exception) -> {
            if (exception == null) {
                this.nodeManagerClient.startContainerAsync(container, context);
                requestResourceFuture.complete(new YarnWorkerNode(container, resourceId));
            } else {
                requestResourceFuture.completeExceptionally((Throwable)exception);
            }
            return null;
        }, (Executor)this.getMainThreadExecutor()));
    }

    private Collection<AMRMClient.ContainerRequest> getPendingRequestsAndCheckConsistency(Priority priority, Resource resource, int expectedNum) {
        List<AMRMClient.ContainerRequest> matchingRequests = this.resourceManagerClient.getMatchingRequests(priority, "*", resource).stream().flatMap(Collection::stream).collect(Collectors.toList());
        Preconditions.checkState((matchingRequests.size() == expectedNum ? 1 : 0) != 0, (String)"The RMClient's and YarnResourceManagers internal state about the number of pending container requests for priority %s has diverged. Number client's pending container requests %s != Number RM's pending container requests %s.", (Object[])new Object[]{priority.getPriority(), matchingRequests.size(), expectedNum});
        return matchingRequests;
    }

    private ContainerLaunchContext createTaskExecutorLaunchContext(ResourceID containerId, String host, TaskExecutorProcessSpec taskExecutorProcessSpec) throws Exception {
        String currDir = this.configuration.getCurrentDir();
        ContaineredTaskManagerParameters taskManagerParameters = ContaineredTaskManagerParameters.create((Configuration)this.flinkConfig, (TaskExecutorProcessSpec)taskExecutorProcessSpec);
        this.log.info("TaskExecutor {} will be started on {} with {}.", new Object[]{containerId.getStringWithMetadata(), host, taskExecutorProcessSpec});
        Configuration taskManagerConfig = BootstrapTools.cloneConfiguration((Configuration)this.flinkConfig);
        taskManagerConfig.set(TaskManagerOptions.TASK_MANAGER_RESOURCE_ID, (Object)containerId.getResourceIdString());
        taskManagerConfig.set(TaskManagerOptionsInternal.TASK_MANAGER_RESOURCE_ID_METADATA, (Object)containerId.getMetadata());
        String taskManagerDynamicProperties = BootstrapTools.getDynamicPropertiesAsString((Configuration)this.flinkClientConfig, (Configuration)taskManagerConfig);
        this.log.debug("TaskManager configuration: {}", (Object)taskManagerConfig);
        ContainerLaunchContext taskExecutorLaunchContext = Utils.createTaskExecutorContext(this.flinkConfig, this.yarnConfig, this.configuration, taskManagerParameters, taskManagerDynamicProperties, currDir, YarnTaskExecutorRunner.class, this.log);
        taskExecutorLaunchContext.getEnvironment().put(ENV_FLINK_NODE_ID, host);
        return taskExecutorLaunchContext;
    }

    @VisibleForTesting
    Optional<Resource> getContainerResource(TaskExecutorProcessSpec taskExecutorProcessSpec) {
        Optional<TaskExecutorProcessSpecContainerResourcePriorityAdapter.PriorityAndResource> opt = this.taskExecutorProcessSpecContainerResourcePriorityAdapter.getPriorityAndResource(taskExecutorProcessSpec);
        if (!opt.isPresent()) {
            return Optional.empty();
        }
        return Optional.of(opt.get().getResource());
    }

    private RegisterApplicationMasterResponse registerApplicationMaster() throws Exception {
        int lastColon;
        String webInterfaceUrl = this.configuration.getWebInterfaceUrl();
        String rpcAddress = this.configuration.getRpcAddress();
        int restPort = webInterfaceUrl != null ? ((lastColon = webInterfaceUrl.lastIndexOf(58)) == -1 ? -1 : Integer.parseInt(webInterfaceUrl.substring(lastColon + 1))) : -1;
        return this.resourceManagerClient.registerApplicationMaster(rpcAddress, restPort, webInterfaceUrl);
    }

    private void getContainersFromPreviousAttempts(RegisterApplicationMasterResponse registerApplicationMasterResponse) {
        List<Container> containersFromPreviousAttempts = this.registerApplicationMasterResponseReflector.getContainersFromPreviousAttempts(registerApplicationMasterResponse);
        ArrayList<YarnWorkerNode> recoveredWorkers = new ArrayList<YarnWorkerNode>();
        this.log.info("Recovered {} containers from previous attempts ({}).", (Object)containersFromPreviousAttempts.size(), containersFromPreviousAttempts);
        for (Container container : containersFromPreviousAttempts) {
            YarnWorkerNode worker = new YarnWorkerNode(container, YarnResourceManagerDriver.getContainerResourceId(container));
            recoveredWorkers.add(worker);
        }
        this.getResourceEventHandler().onPreviousAttemptWorkersRecovered(recoveredWorkers);
    }

    private FinalApplicationStatus getYarnStatus(ApplicationStatus status) {
        if (status == null) {
            return FinalApplicationStatus.UNDEFINED;
        }
        switch (status) {
            case SUCCEEDED: {
                return FinalApplicationStatus.SUCCEEDED;
            }
            case FAILED: {
                return FinalApplicationStatus.FAILED;
            }
            case CANCELED: {
                return FinalApplicationStatus.KILLED;
            }
        }
        return FinalApplicationStatus.UNDEFINED;
    }

    @Nonnull
    @VisibleForTesting
    static AMRMClient.ContainerRequest getContainerRequest(Resource containerResource, Priority priority) {
        return new AMRMClient.ContainerRequest(containerResource, null, null, priority);
    }

    @VisibleForTesting
    private static ResourceID getContainerResourceId(Container container) {
        return new ResourceID(container.getId().toString(), container.getNodeId().toString());
    }

    private Map<Priority, List<Container>> groupContainerByPriority(List<Container> containers) {
        return containers.stream().collect(Collectors.groupingBy(Container::getPriority));
    }

    private void checkInitialized() {
        Preconditions.checkState((this.taskExecutorProcessSpecContainerResourcePriorityAdapter != null ? 1 : 0) != 0, (Object)"Driver not initialized.");
    }

    class YarnContainerEventHandler
    implements AMRMClientAsync.CallbackHandler,
    NMClientAsync.CallbackHandler {
        YarnContainerEventHandler() {
        }

        public void onContainersCompleted(List<ContainerStatus> statuses) {
            this.runAsyncWithFatalHandler(() -> {
                YarnResourceManagerDriver.this.checkInitialized();
                YarnResourceManagerDriver.this.log.debug("YARN ResourceManager reported the following containers completed: {}.", (Object)statuses);
                for (ContainerStatus containerStatus : statuses) {
                    String containerId = containerStatus.getContainerId().toString();
                    YarnResourceManagerDriver.this.getResourceEventHandler().onWorkerTerminated(new ResourceID(containerId), containerStatus.getDiagnostics());
                }
            });
        }

        public void onContainersAllocated(List<Container> containers) {
            this.runAsyncWithFatalHandler(() -> {
                YarnResourceManagerDriver.this.checkInitialized();
                YarnResourceManagerDriver.this.log.info("Received {} containers.", (Object)containers.size());
                for (Map.Entry entry : YarnResourceManagerDriver.this.groupContainerByPriority(containers).entrySet()) {
                    YarnResourceManagerDriver.this.onContainersOfPriorityAllocated((Priority)entry.getKey(), (List)entry.getValue());
                }
                if (YarnResourceManagerDriver.this.getNumRequestedNotAllocatedWorkers() <= 0) {
                    YarnResourceManagerDriver.this.resourceManagerClient.setHeartbeatInterval(YarnResourceManagerDriver.this.yarnHeartbeatIntervalMillis);
                }
            });
        }

        private void runAsyncWithFatalHandler(Runnable runnable) {
            YarnResourceManagerDriver.this.getMainThreadExecutor().execute(() -> {
                try {
                    runnable.run();
                }
                catch (Throwable t) {
                    this.onError(t);
                }
            });
        }

        public void onShutdownRequest() {
            YarnResourceManagerDriver.this.getResourceEventHandler().onError((Throwable)new ResourceManagerException(YarnResourceManagerDriver.ERROR_MESSAGE_ON_SHUTDOWN_REQUEST));
        }

        public void onNodesUpdated(List<NodeReport> list) {
        }

        public float getProgress() {
            return 1.0f;
        }

        public void onError(Throwable throwable) {
            YarnResourceManagerDriver.this.getResourceEventHandler().onError(throwable);
        }

        public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer> map) {
            YarnResourceManagerDriver.this.log.debug("Succeeded to call YARN Node Manager to start container {}.", (Object)containerId);
        }

        public void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus) {
        }

        public void onContainerStopped(ContainerId containerId) {
            YarnResourceManagerDriver.this.log.debug("Succeeded to call YARN Node Manager to stop container {}.", (Object)containerId);
        }

        public void onStartContainerError(ContainerId containerId, Throwable throwable) {
            this.runAsyncWithFatalHandler(() -> {
                YarnResourceManagerDriver.this.resourceManagerClient.releaseAssignedContainer(containerId);
                YarnResourceManagerDriver.this.getResourceEventHandler().onWorkerTerminated(new ResourceID(containerId.toString()), throwable.getMessage());
            });
        }

        public void onGetContainerStatusError(ContainerId containerId, Throwable throwable) {
        }

        public void onStopContainerError(ContainerId containerId, Throwable throwable) {
            YarnResourceManagerDriver.this.log.warn("Error while calling YARN Node Manager to stop container {}.", (Object)containerId, (Object)throwable);
        }
    }
}

