package org.apache.flink.runtime.highavailability;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.Executor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.blob.BlobUtils;
import org.apache.flink.runtime.dispatcher.Dispatcher;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneHaServices;
import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperClientHAServices;
import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperLeaderElectionHaServices;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.rpc.AddressResolution;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcServiceUtils;
import org.apache.flink.runtime.rpc.RpcSystemUtils;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;

/* loaded from: input_file:org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.class */
public class HighAvailabilityServicesUtils {
    public static HighAvailabilityServices createAvailableOrEmbeddedServices(Configuration configuration, Executor executor, FatalErrorHandler fatalErrorHandler) throws Exception {
        HighAvailabilityMode fromConfig = HighAvailabilityMode.fromConfig(configuration);
        switch (fromConfig) {
            case NONE:
                return new EmbeddedHaServices(executor);
            case ZOOKEEPER:
                return createZooKeeperHaServices(configuration, executor, fatalErrorHandler);
            case KUBERNETES:
                return createCustomHAServices("org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory", configuration, executor);
            case FACTORY_CLASS:
                return createCustomHAServices(configuration, executor);
            default:
                throw new Exception("High availability mode " + fromConfig + " is not supported.");
        }
    }

    private static HighAvailabilityServices createZooKeeperHaServices(Configuration configuration, Executor executor, FatalErrorHandler fatalErrorHandler) throws Exception {
        return new ZooKeeperLeaderElectionHaServices(ZooKeeperUtils.startCuratorFramework(configuration, fatalErrorHandler), configuration, executor, BlobUtils.createBlobStoreFromConfig(configuration));
    }

    public static HighAvailabilityServices createHighAvailabilityServices(Configuration configuration, Executor executor, AddressResolution addressResolution, RpcSystemUtils rpcSystemUtils, FatalErrorHandler fatalErrorHandler) throws Exception {
        HighAvailabilityMode fromConfig = HighAvailabilityMode.fromConfig(configuration);
        switch (fromConfig) {
            case NONE:
                Tuple2<String, Integer> jobManagerAddress = getJobManagerAddress(configuration);
                return new StandaloneHaServices(rpcSystemUtils.getRpcUrl((String) jobManagerAddress.f0, ((Integer) jobManagerAddress.f1).intValue(), RpcServiceUtils.createWildcardName(ResourceManager.RESOURCE_MANAGER_NAME), addressResolution, configuration), rpcSystemUtils.getRpcUrl((String) jobManagerAddress.f0, ((Integer) jobManagerAddress.f1).intValue(), RpcServiceUtils.createWildcardName(Dispatcher.DISPATCHER_NAME), addressResolution, configuration), getWebMonitorAddress(configuration, addressResolution));
            case ZOOKEEPER:
                return createZooKeeperHaServices(configuration, executor, fatalErrorHandler);
            case KUBERNETES:
                return createCustomHAServices("org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory", configuration, executor);
            case FACTORY_CLASS:
                return createCustomHAServices(configuration, executor);
            default:
                throw new Exception("Recovery mode " + fromConfig + " is not supported.");
        }
    }

    public static ClientHighAvailabilityServices createClientHAService(Configuration configuration, FatalErrorHandler fatalErrorHandler) throws Exception {
        HighAvailabilityMode fromConfig = HighAvailabilityMode.fromConfig(configuration);
        switch (fromConfig) {
            case NONE:
                return new StandaloneClientHAServices(getWebMonitorAddress(configuration, AddressResolution.TRY_ADDRESS_RESOLUTION));
            case ZOOKEEPER:
                return new ZooKeeperClientHAServices(ZooKeeperUtils.startCuratorFramework(configuration, fatalErrorHandler), configuration);
            case KUBERNETES:
                return createCustomClientHAServices("org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory", configuration);
            case FACTORY_CLASS:
                return createCustomClientHAServices(configuration);
            default:
                throw new Exception("Recovery mode " + fromConfig + " is not supported.");
        }
    }

    public static Tuple2<String, Integer> getJobManagerAddress(Configuration configuration) throws ConfigurationException {
        String str = (String) configuration.get(JobManagerOptions.ADDRESS);
        int intValue = ((Integer) configuration.get(JobManagerOptions.PORT)).intValue();
        if (str == null) {
            throw new ConfigurationException("Config parameter '" + JobManagerOptions.ADDRESS + "' is missing (hostname/address of JobManager to connect to).");
        }
        if (intValue <= 0 || intValue >= 65536) {
            throw new ConfigurationException("Invalid value for '" + JobManagerOptions.PORT + "' (port of the JobManager actor system) : " + intValue + ".  it must be greater than 0 and less than 65536.");
        }
        return Tuple2.of(str, Integer.valueOf(intValue));
    }

    public static String getWebMonitorAddress(Configuration configuration, AddressResolution addressResolution) throws UnknownHostException {
        String str = (String) Preconditions.checkNotNull(configuration.get(RestOptions.ADDRESS), "%s must be set", new Object[]{RestOptions.ADDRESS.key()});
        if (addressResolution == AddressResolution.TRY_ADDRESS_RESOLUTION) {
            InetAddress.getByName(str);
        }
        return String.format("%s%s:%s", SecurityOptions.isRestSSLEnabled(configuration) ? "https://" : "http://", str, Integer.valueOf(((Integer) configuration.get(RestOptions.PORT)).intValue()));
    }

    public static Path getClusterHighAvailableStoragePath(Configuration configuration) {
        String value = configuration.getValue(HighAvailabilityOptions.HA_STORAGE_PATH);
        if (StringUtils.isNullOrWhitespaceOnly(value)) {
            throw new IllegalConfigurationException("Configuration is missing the mandatory parameter: " + HighAvailabilityOptions.HA_STORAGE_PATH);
        }
        try {
            Path path = new Path(value);
            String value2 = configuration.getValue(HighAvailabilityOptions.HA_CLUSTER_ID);
            try {
                return new Path(path, value2);
            } catch (Exception e) {
                throw new IllegalConfigurationException(String.format("Cannot create cluster high available storage path '%s/%s'. This indicates that an invalid cluster id (%s) has been specified.", value, value2, HighAvailabilityOptions.HA_CLUSTER_ID.key()), e);
            }
        } catch (Exception e2) {
            throw new IllegalConfigurationException("Invalid path for highly available storage (" + HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e2);
        }
    }

    private static HighAvailabilityServices createCustomHAServices(Configuration configuration, Executor executor) throws FlinkException {
        return createCustomHAServices((String) configuration.get(HighAvailabilityOptions.HA_MODE), configuration, executor);
    }

    private static HighAvailabilityServices createCustomHAServices(String str, Configuration configuration, Executor executor) throws FlinkException {
        HighAvailabilityServicesFactory loadCustomHighAvailabilityServicesFactory = loadCustomHighAvailabilityServicesFactory(str);
        try {
            return loadCustomHighAvailabilityServicesFactory.createHAServices(configuration, executor);
        } catch (Exception e) {
            throw new FlinkException(String.format("Could not create the ha services from the instantiated HighAvailabilityServicesFactory %s.", loadCustomHighAvailabilityServicesFactory.getClass().getName()), e);
        }
    }

    private static HighAvailabilityServicesFactory loadCustomHighAvailabilityServicesFactory(String str) throws FlinkException {
        return (HighAvailabilityServicesFactory) InstantiationUtil.instantiate(str, HighAvailabilityServicesFactory.class, Thread.currentThread().getContextClassLoader());
    }

    private static ClientHighAvailabilityServices createCustomClientHAServices(Configuration configuration) throws FlinkException {
        return createCustomClientHAServices((String) configuration.get(HighAvailabilityOptions.HA_MODE), configuration);
    }

    private static ClientHighAvailabilityServices createCustomClientHAServices(String str, Configuration configuration) throws FlinkException {
        HighAvailabilityServicesFactory loadCustomHighAvailabilityServicesFactory = loadCustomHighAvailabilityServicesFactory(str);
        try {
            return loadCustomHighAvailabilityServicesFactory.createClientHAServices(configuration);
        } catch (Exception e) {
            throw new FlinkException(String.format("Could not create the client ha services from the instantiated HighAvailabilityServicesFactory %s.", loadCustomHighAvailabilityServicesFactory.getClass().getName()), e);
        }
    }
}
