/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.highavailability;

import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesLeaderElectionConfiguration;
import org.apache.flink.kubernetes.highavailability.KubernetesCheckpointRecoveryFactory;
import org.apache.flink.kubernetes.highavailability.KubernetesLeaderElectionDriverFactory;
import org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriverFactory;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.KubernetesConfigMapSharedWatcher;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.runtime.blob.BlobStoreService;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.highavailability.AbstractHaServices;
import org.apache.flink.runtime.highavailability.FileSystemJobResultStore;
import org.apache.flink.runtime.highavailability.JobResultStore;
import org.apache.flink.runtime.jobmanager.ExecutionPlanStore;
import org.apache.flink.runtime.leaderelection.LeaderElectionDriverFactory;
import org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalDriverFactory;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KubernetesLeaderElectionHaServices
extends AbstractHaServices {
    private static final Logger LOG = LoggerFactory.getLogger(KubernetesLeaderElectionHaServices.class);
    private final String clusterId;
    private final FlinkKubeClient kubeClient;
    private final KubernetesConfigMapSharedWatcher configMapSharedWatcher;
    private final ExecutorService watchExecutorService;
    private final String lockIdentity;

    KubernetesLeaderElectionHaServices(FlinkKubeClient kubeClient, Executor ioExecutor, Configuration configuration, BlobStoreService blobStoreService) throws Exception {
        this(kubeClient, kubeClient.createConfigMapSharedWatcher(KubernetesLeaderElectionHaServices.getClusterConfigMap((String)configuration.get(KubernetesConfigOptions.CLUSTER_ID))), Executors.newCachedThreadPool((ThreadFactory)new ExecutorThreadFactory("config-map-watch-handler")), ioExecutor, (String)configuration.get(KubernetesConfigOptions.CLUSTER_ID), UUID.randomUUID().toString(), configuration, blobStoreService);
    }

    private KubernetesLeaderElectionHaServices(FlinkKubeClient kubeClient, KubernetesConfigMapSharedWatcher configMapSharedWatcher, ExecutorService watchExecutorService, Executor ioExecutor, String clusterId, String lockIdentity, Configuration configuration, BlobStoreService blobStoreService) throws Exception {
        super(configuration, KubernetesLeaderElectionHaServices.createDriverFactory(kubeClient, configMapSharedWatcher, watchExecutorService, clusterId, lockIdentity, configuration), ioExecutor, blobStoreService, (JobResultStore)FileSystemJobResultStore.fromConfiguration((Configuration)configuration, (Executor)ioExecutor));
        this.kubeClient = (FlinkKubeClient)Preconditions.checkNotNull((Object)kubeClient);
        this.clusterId = (String)Preconditions.checkNotNull((Object)clusterId);
        this.configMapSharedWatcher = (KubernetesConfigMapSharedWatcher)Preconditions.checkNotNull((Object)configMapSharedWatcher);
        this.watchExecutorService = (ExecutorService)Preconditions.checkNotNull((Object)watchExecutorService);
        this.lockIdentity = (String)Preconditions.checkNotNull((Object)lockIdentity);
    }

    private static LeaderElectionDriverFactory createDriverFactory(FlinkKubeClient kubeClient, KubernetesConfigMapSharedWatcher configMapSharedWatcher, Executor watchExecutorService, String clusterId, String lockIdentity, Configuration configuration) {
        KubernetesLeaderElectionConfiguration leaderElectionConfiguration = new KubernetesLeaderElectionConfiguration(KubernetesLeaderElectionHaServices.getClusterConfigMap(clusterId), lockIdentity, configuration);
        return new KubernetesLeaderElectionDriverFactory(kubeClient, leaderElectionConfiguration, configMapSharedWatcher, watchExecutorService);
    }

    protected LeaderRetrievalService createLeaderRetrievalService(String componentId) {
        return new DefaultLeaderRetrievalService((LeaderRetrievalDriverFactory)new KubernetesLeaderRetrievalDriverFactory(this.configMapSharedWatcher, this.watchExecutorService, this.getClusterConfigMap(), componentId));
    }

    protected CheckpointRecoveryFactory createCheckpointRecoveryFactory() {
        return KubernetesCheckpointRecoveryFactory.withoutLeadershipValidation(this.kubeClient, this.configuration, this.ioExecutor, this.clusterId, this::getJobSpecificConfigMap);
    }

    private String getJobSpecificConfigMap(JobID jobID) {
        return this.clusterId + "-" + jobID.toString() + "-config-map";
    }

    protected ExecutionPlanStore createExecutionPlanStore() throws Exception {
        return KubernetesUtils.createExecutionPlanStore(this.configuration, this.kubeClient, this.getClusterConfigMap(), this.lockIdentity);
    }

    private String getClusterConfigMap() {
        return KubernetesLeaderElectionHaServices.getClusterConfigMap(this.clusterId);
    }

    private static String getClusterConfigMap(String clusterId) {
        return clusterId + "-cluster-config-map";
    }

    public void internalClose() throws Exception {
        Exception exception = null;
        try {
            this.closeK8sServices();
        }
        catch (Exception e) {
            exception = e;
        }
        this.kubeClient.close();
        ExecutorUtils.gracefulShutdown((long)5L, (TimeUnit)TimeUnit.SECONDS, (ExecutorService[])new ExecutorService[]{this.watchExecutorService});
        ExceptionUtils.tryRethrowException((Exception)exception);
    }

    private void closeK8sServices() {
        this.configMapSharedWatcher.close();
        int outstandingTaskCount = this.watchExecutorService.shutdownNow().size();
        if (outstandingTaskCount != 0) {
            LOG.debug("The k8s HA services were closed with {} event(s) still not being processed. No further action necessary.", (Object)outstandingTaskCount);
        }
    }

    public void internalCleanup() throws Exception {
        Exception exception = null;
        try {
            this.closeK8sServices();
        }
        catch (Exception e) {
            exception = e;
        }
        this.kubeClient.deleteConfigMap(this.getClusterConfigMap()).get();
        ExceptionUtils.tryRethrowException((Exception)exception);
    }

    public void internalCleanupJobData(JobID jobID) throws Exception {
        this.kubeClient.deleteConfigMap(this.getJobSpecificConfigMap(jobID)).get();
    }

    protected String getLeaderPathForResourceManager() {
        return "resourcemanager";
    }

    protected String getLeaderPathForDispatcher() {
        return "dispatcher";
    }

    protected String getLeaderPathForJobManager(JobID jobID) {
        return "job-" + jobID.toString();
    }

    protected String getLeaderPathForRestServer() {
        return "restserver";
    }
}

