package org.apache.flink.kubernetes;

import java.util.Optional;
import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.ClusterRetrieveException;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.client.program.PackagedProgramUtils;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.kubernetes.artifact.KubernetesArtifactUploader;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptionsInternal;
import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
import org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint;
import org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint;
import org.apache.flink.kubernetes.kubeclient.Endpoint;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClientFactory;
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
import org.apache.flink.kubernetes.kubeclient.factory.KubernetesJobManagerFactory;
import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.rpc.AddressResolution;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/KubernetesClusterDescriptor.class */
public class KubernetesClusterDescriptor implements ClusterDescriptor<String> {
    private static final Logger LOG = LoggerFactory.getLogger(KubernetesClusterDescriptor.class);
    private static final String CLUSTER_DESCRIPTION = "Kubernetes cluster";
    private final Configuration flinkConfig;
    private final FlinkKubeClientFactory clientFactory;
    private final FlinkKubeClient client;
    private final KubernetesArtifactUploader artifactUploader;
    private final String clusterId;

    public KubernetesClusterDescriptor(Configuration configuration, FlinkKubeClientFactory flinkKubeClientFactory, KubernetesArtifactUploader kubernetesArtifactUploader) {
        this.flinkConfig = configuration;
        this.clientFactory = flinkKubeClientFactory;
        this.artifactUploader = kubernetesArtifactUploader;
        this.client = flinkKubeClientFactory.fromConfiguration(configuration, "client");
        this.clusterId = (String) Preconditions.checkNotNull((String) configuration.get(KubernetesConfigOptions.CLUSTER_ID), "ClusterId must be specified!");
    }

    public String getClusterDescription() {
        return CLUSTER_DESCRIPTION;
    }

    private ClusterClientProvider<String> createClusterClientProvider(String str) {
        return () -> {
            Configuration configuration = new Configuration(this.flinkConfig);
            FlinkKubeClient fromConfiguration = this.clientFactory.fromConfiguration(configuration, "client");
            try {
                Optional<Endpoint> restEndpoint = fromConfiguration.getRestEndpoint(str);
                if (fromConfiguration != null) {
                    fromConfiguration.close();
                }
                if (!restEndpoint.isPresent()) {
                    throw new RuntimeException((Throwable) new ClusterRetrieveException("Could not get the rest endpoint of " + str));
                }
                configuration.set(RestOptions.ADDRESS, restEndpoint.get().getAddress());
                configuration.set(RestOptions.PORT, Integer.valueOf(restEndpoint.get().getPort()));
                try {
                    return new RestClusterClient(configuration, str, (configuration2, fatalErrorHandler) -> {
                        return new StandaloneClientHAServices(getWebMonitorAddress(configuration2));
                    });
                } catch (Exception e) {
                    throw new RuntimeException((Throwable) new ClusterRetrieveException("Could not create the RestClusterClient.", e));
                }
            } catch (Throwable th) {
                if (fromConfiguration != null) {
                    try {
                        fromConfiguration.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        };
    }

    private String getWebMonitorAddress(Configuration configuration) throws Exception {
        AddressResolution addressResolution = AddressResolution.TRY_ADDRESS_RESOLUTION;
        KubernetesConfigOptions.ServiceExposedType serviceExposedType = (KubernetesConfigOptions.ServiceExposedType) configuration.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE);
        if (serviceExposedType.isClusterIP()) {
            addressResolution = AddressResolution.NO_ADDRESS_RESOLUTION;
            LOG.warn("Please note that Flink client operations(e.g. cancel, list, stop, savepoint, etc.) won't work from outside the Kubernetes cluster since '{}' has been set to {}.", KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE.key(), serviceExposedType);
        }
        return HighAvailabilityServicesUtils.getWebMonitorAddress(configuration, addressResolution);
    }

    public ClusterClientProvider<String> retrieve(String str) {
        ClusterClientProvider<String> createClusterClientProvider = createClusterClientProvider(str);
        ClusterClient clusterClient = createClusterClientProvider.getClusterClient();
        try {
            LOG.info("Retrieve flink cluster {} successfully, JobManager Web Interface: {}", str, clusterClient.getWebInterfaceURL());
            if (clusterClient != null) {
                clusterClient.close();
            }
            return createClusterClientProvider;
        } catch (Throwable th) {
            if (clusterClient != null) {
                try {
                    clusterClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public ClusterClientProvider<String> deploySessionCluster(ClusterSpecification clusterSpecification) throws ClusterDeploymentException {
        ClusterClientProvider<String> deployClusterInternal = deployClusterInternal(KubernetesSessionClusterEntrypoint.class.getName(), clusterSpecification, false);
        ClusterClient clusterClient = deployClusterInternal.getClusterClient();
        try {
            LOG.info("Create flink session cluster {} successfully, JobManager Web Interface: {}", this.clusterId, clusterClient.getWebInterfaceURL());
            if (clusterClient != null) {
                clusterClient.close();
            }
            return deployClusterInternal;
        } catch (Throwable th) {
            if (clusterClient != null) {
                try {
                    clusterClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public ClusterClientProvider<String> deployApplicationCluster(ClusterSpecification clusterSpecification, ApplicationConfiguration applicationConfiguration) throws ClusterDeploymentException {
        if (this.client.getService(ExternalServiceDecorator.getExternalServiceName(this.clusterId)).isPresent()) {
            throw new ClusterDeploymentException("The Flink cluster " + this.clusterId + " already exists.");
        }
        Preconditions.checkNotNull(clusterSpecification);
        Preconditions.checkNotNull(applicationConfiguration);
        KubernetesDeploymentTarget fromConfig = KubernetesDeploymentTarget.fromConfig(this.flinkConfig);
        if (KubernetesDeploymentTarget.APPLICATION != fromConfig) {
            throw new ClusterDeploymentException("Couldn't deploy Kubernetes Application Cluster. Expected deployment.target=" + KubernetesDeploymentTarget.APPLICATION.getName() + " but actual one was \"" + fromConfig + "\"");
        }
        applicationConfiguration.applyToConfiguration(this.flinkConfig);
        if (!PackagedProgramUtils.isPython(applicationConfiguration.getApplicationClassName()).booleanValue() && !PackagedProgramUtils.isPython(applicationConfiguration.getProgramArguments())) {
            Preconditions.checkArgument(KubernetesUtils.checkJarFileForApplicationMode(this.flinkConfig).size() <= 1, "Should only have at most one jar.");
        }
        try {
            this.artifactUploader.uploadAll(this.flinkConfig);
            ClusterClientProvider<String> deployClusterInternal = deployClusterInternal(KubernetesApplicationClusterEntrypoint.class.getName(), clusterSpecification, false);
            ClusterClient clusterClient = deployClusterInternal.getClusterClient();
            try {
                LOG.info("Create flink application cluster {} successfully, JobManager Web Interface: {}", this.clusterId, clusterClient.getWebInterfaceURL());
                if (clusterClient != null) {
                    clusterClient.close();
                }
                return deployClusterInternal;
            } catch (Throwable th) {
                if (clusterClient != null) {
                    try {
                        clusterClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Exception e) {
            throw new ClusterDeploymentException(e);
        }
    }

    private ClusterClientProvider<String> deployClusterInternal(String str, ClusterSpecification clusterSpecification, boolean z) throws ClusterDeploymentException {
        this.flinkConfig.set(ClusterEntrypoint.INTERNAL_CLUSTER_EXECUTION_MODE, (z ? ClusterEntrypoint.ExecutionMode.DETACHED : ClusterEntrypoint.ExecutionMode.NORMAL).toString());
        this.flinkConfig.set(KubernetesConfigOptionsInternal.ENTRY_POINT_CLASS, str);
        KubernetesUtils.checkAndUpdatePortConfigOption(this.flinkConfig, BlobServerOptions.PORT, Constants.BLOB_SERVER_PORT);
        KubernetesUtils.checkAndUpdatePortConfigOption(this.flinkConfig, TaskManagerOptions.RPC_PORT, Constants.TASK_MANAGER_RPC_PORT);
        KubernetesUtils.checkAndUpdatePortConfigOption(this.flinkConfig, RestOptions.BIND_PORT, Constants.REST_PORT);
        if (HighAvailabilityMode.isHighAvailabilityModeActivated(this.flinkConfig)) {
            this.flinkConfig.set(HighAvailabilityOptions.HA_CLUSTER_ID, this.clusterId);
            KubernetesUtils.checkAndUpdatePortConfigOption(this.flinkConfig, HighAvailabilityOptions.HA_JOB_MANAGER_PORT_RANGE, ((Integer) this.flinkConfig.get(JobManagerOptions.PORT)).intValue());
        }
        try {
            KubernetesJobManagerParameters kubernetesJobManagerParameters = new KubernetesJobManagerParameters(this.flinkConfig, clusterSpecification);
            this.client.createJobManagerComponent(KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification((FlinkPod) kubernetesJobManagerParameters.getPodTemplateFilePath().map(file -> {
                return KubernetesUtils.loadPodFromTemplateFile(this.client, file, Constants.MAIN_CONTAINER_NAME);
            }).orElse(new FlinkPod.Builder().build()), kubernetesJobManagerParameters));
            return createClusterClientProvider(this.clusterId);
        } catch (Exception e) {
            try {
                LOG.warn("Failed to create the Kubernetes cluster \"{}\", try to clean up the residual resources.", this.clusterId);
                this.client.stopAndCleanupCluster(this.clusterId);
            } catch (Exception e2) {
                LOG.info("Failed to stop and clean up the Kubernetes cluster \"{}\".", this.clusterId, e2);
            }
            throw new ClusterDeploymentException("Could not create Kubernetes cluster \"" + this.clusterId + "\".", e);
        }
    }

    public void killCluster(String str) throws FlinkException {
        try {
            this.client.stopAndCleanupCluster(str);
        } catch (Exception e) {
            throw new FlinkException("Could not kill Kubernetes cluster " + str);
        }
    }

    public void close() {
        try {
            this.client.close();
        } catch (Exception e) {
            LOG.error("failed to close client, exception {}", e.toString());
        }
    }
}
