/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmanager;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Executor;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.zookeeper.StateStorageHelper;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.shaded.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.flink.shaded.org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.flink.shaded.org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.flink.shaded.org.apache.curator.utils.ZKPaths;
import org.apache.flink.util.Preconditions;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZooKeeperSubmittedJobGraphStore
implements SubmittedJobGraphStore {
    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperSubmittedJobGraphStore.class);
    private final Object cacheLock = new Object();
    private final CuratorFramework client;
    private final Set<JobID> addedJobGraphs = new HashSet<JobID>();
    private final ZooKeeperStateHandleStore<SubmittedJobGraph> jobGraphsInZooKeeper;
    private final PathChildrenCache pathCache;
    private final String zooKeeperFullBasePath;
    private SubmittedJobGraphStore.SubmittedJobGraphListener jobGraphListener;
    private boolean isRunning;

    public ZooKeeperSubmittedJobGraphStore(CuratorFramework client, String currentJobsPath, StateStorageHelper<SubmittedJobGraph> stateStorage, Executor executor) throws Exception {
        Preconditions.checkNotNull((Object)currentJobsPath, (String)"Current jobs path");
        Preconditions.checkNotNull(stateStorage, (String)"State storage");
        this.client = (CuratorFramework)Preconditions.checkNotNull((Object)client, (String)"Curator client");
        client.newNamespaceAwareEnsurePath(currentJobsPath).ensure(client.getZookeeperClient());
        CuratorFramework facade = client.usingNamespace(client.getNamespace() + currentJobsPath);
        this.zooKeeperFullBasePath = client.getNamespace() + currentJobsPath;
        this.jobGraphsInZooKeeper = new ZooKeeperStateHandleStore(facade, stateStorage, executor);
        this.pathCache = new PathChildrenCache(facade, "/", false);
        this.pathCache.getListenable().addListener(new SubmittedJobGraphsPathCacheListener());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void start(SubmittedJobGraphStore.SubmittedJobGraphListener jobGraphListener) throws Exception {
        Object object = this.cacheLock;
        synchronized (object) {
            if (!this.isRunning) {
                this.jobGraphListener = jobGraphListener;
                this.pathCache.start();
                this.isRunning = true;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void stop() throws Exception {
        Object object = this.cacheLock;
        synchronized (object) {
            if (this.isRunning) {
                this.jobGraphListener = null;
                this.pathCache.close();
                this.client.close();
                this.isRunning = false;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
        Preconditions.checkNotNull((Object)jobId, (String)"Job ID");
        String path = ZooKeeperSubmittedJobGraphStore.getPathForJob(jobId);
        Object object = this.cacheLock;
        synchronized (object) {
            SubmittedJobGraph jobGraph;
            StateHandle<SubmittedJobGraph> submittedJobStateHandle;
            this.verifyIsRunning();
            try {
                submittedJobStateHandle = this.jobGraphsInZooKeeper.get(path);
            }
            catch (KeeperException.NoNodeException ignored) {
                return null;
            }
            catch (Exception e) {
                throw new Exception("Could not retrieve the submitted job graph state handle for " + path + "from the submitted job graph store.", e);
            }
            LOG.debug("Recovering job graph {} from {}{}.", new Object[]{jobId, this.zooKeeperFullBasePath, path});
            try {
                jobGraph = submittedJobStateHandle.getState(this.getClass().getClassLoader());
            }
            catch (Exception e) {
                throw new Exception("Failed to retrieve the submitted job graph from state handle.", e);
            }
            this.addedJobGraphs.add(jobGraph.getJobId());
            LOG.info("Recovered {}.", (Object)jobGraph);
            return jobGraph;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception {
        Preconditions.checkNotNull((Object)jobGraph, (String)"Job graph");
        String path = ZooKeeperSubmittedJobGraphStore.getPathForJob(jobGraph.getJobId());
        LOG.debug("Adding job graph {} to {}{}.", new Object[]{jobGraph.getJobId(), this.zooKeeperFullBasePath, path});
        boolean success = false;
        while (!success) {
            Object object = this.cacheLock;
            synchronized (object) {
                this.verifyIsRunning();
                int currentVersion = this.jobGraphsInZooKeeper.exists(path);
                if (currentVersion == -1) {
                    try {
                        this.jobGraphsInZooKeeper.add(path, jobGraph);
                        this.addedJobGraphs.add(jobGraph.getJobId());
                        success = true;
                    }
                    catch (KeeperException.NodeExistsException nodeExistsException) {}
                } else if (this.addedJobGraphs.contains(jobGraph.getJobId())) {
                    try {
                        this.jobGraphsInZooKeeper.replace(path, currentVersion, jobGraph);
                        LOG.info("Updated {} in ZooKeeper.", (Object)jobGraph);
                        success = true;
                    }
                    catch (KeeperException.NoNodeException noNodeException) {}
                } else {
                    throw new IllegalStateException("Oh, no. Trying to update a graph you didn't #getAllSubmittedJobGraphs() or #putJobGraph() yourself before.");
                }
            }
        }
        LOG.info("Added {} to ZooKeeper.", (Object)jobGraph);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void removeJobGraph(JobID jobId) throws Exception {
        Preconditions.checkNotNull((Object)jobId, (String)"Job ID");
        String path = ZooKeeperSubmittedJobGraphStore.getPathForJob(jobId);
        LOG.debug("Removing job graph {} from {}{}.", new Object[]{jobId, this.zooKeeperFullBasePath, path});
        Object object = this.cacheLock;
        synchronized (object) {
            if (this.addedJobGraphs.contains(jobId)) {
                this.jobGraphsInZooKeeper.removeAndDiscardState(path);
                this.addedJobGraphs.remove(jobId);
            }
        }
        LOG.info("Removed job graph {} from ZooKeeper.", (Object)jobId);
    }

    @Override
    public Collection<JobID> getJobIds() throws Exception {
        Collection<String> paths;
        try {
            paths = this.jobGraphsInZooKeeper.getAllPaths();
        }
        catch (Exception e) {
            throw new Exception("Failed to retrieve entry paths from ZooKeeperStateHandleStore.", e);
        }
        ArrayList<JobID> jobIds = new ArrayList<JobID>(paths.size());
        for (String path : paths) {
            try {
                jobIds.add(ZooKeeperSubmittedJobGraphStore.jobIdfromPath(path));
            }
            catch (Exception exception) {
                LOG.warn("Could not parse job id from {}.", (Object)path, (Object)exception);
            }
        }
        return jobIds;
    }

    private void verifyIsRunning() {
        Preconditions.checkState((boolean)this.isRunning, (Object)"Not running. Forgot to call start()?");
    }

    public static String getPathForJob(JobID jobId) {
        Preconditions.checkNotNull((Object)jobId, (String)"Job ID");
        return String.format("/%s", jobId);
    }

    public static JobID jobIdfromPath(String path) {
        return JobID.fromHexString((String)path);
    }

    private final class SubmittedJobGraphsPathCacheListener
    implements PathChildrenCacheListener {
        private SubmittedJobGraphsPathCacheListener() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
            if (LOG.isDebugEnabled()) {
                if (event.getData() != null) {
                    LOG.debug("Received {} event (path: {})", (Object)event.getType(), (Object)event.getData().getPath());
                } else {
                    LOG.debug("Received {} event", (Object)event.getType());
                }
            }
            switch (event.getType()) {
                case CHILD_ADDED: {
                    JobID jobId = this.fromEvent(event);
                    LOG.debug("Received CHILD_ADDED event notification for job {}", (Object)jobId);
                    Object object = ZooKeeperSubmittedJobGraphStore.this.cacheLock;
                    synchronized (object) {
                        try {
                            if (ZooKeeperSubmittedJobGraphStore.this.jobGraphListener != null && !ZooKeeperSubmittedJobGraphStore.this.addedJobGraphs.contains(jobId)) {
                                try {
                                    ZooKeeperSubmittedJobGraphStore.this.jobGraphListener.onAddedJobGraph(jobId);
                                }
                                catch (Throwable t) {
                                    LOG.error("Error in callback", t);
                                }
                            }
                        }
                        catch (Exception e) {
                            LOG.error("Error in SubmittedJobGraphsPathCacheListener", (Throwable)e);
                        }
                        break;
                    }
                }
                case CHILD_UPDATED: {
                    break;
                }
                case CHILD_REMOVED: {
                    JobID jobId = this.fromEvent(event);
                    LOG.debug("Received CHILD_REMOVED event notification for job {}", (Object)jobId);
                    Object object = ZooKeeperSubmittedJobGraphStore.this.cacheLock;
                    synchronized (object) {
                        try {
                            if (ZooKeeperSubmittedJobGraphStore.this.jobGraphListener != null && ZooKeeperSubmittedJobGraphStore.this.addedJobGraphs.contains(jobId)) {
                                try {
                                    ZooKeeperSubmittedJobGraphStore.this.jobGraphListener.onRemovedJobGraph(jobId);
                                }
                                catch (Throwable t) {
                                    LOG.error("Error in callback", t);
                                }
                            }
                        }
                        catch (Exception e) {
                            LOG.error("Error in SubmittedJobGraphsPathCacheListener", (Throwable)e);
                        }
                        break;
                    }
                }
                case CONNECTION_SUSPENDED: {
                    LOG.warn("ZooKeeper connection SUSPENDED. Changes to the submitted job graphs are not monitored (temporarily).");
                    break;
                }
                case CONNECTION_LOST: {
                    LOG.warn("ZooKeeper connection LOST. Changes to the submitted job graphs are not monitored (permanently).");
                    break;
                }
                case CONNECTION_RECONNECTED: {
                    LOG.info("ZooKeeper connection RECONNECTED. Changes to the submitted job graphs are monitored again.");
                    break;
                }
                case INITIALIZED: {
                    LOG.info("SubmittedJobGraphsPathCacheListener initialized");
                }
            }
        }

        private JobID fromEvent(PathChildrenCacheEvent event) {
            return JobID.fromHexString((String)ZKPaths.getNodeFromPath(event.getData().getPath()));
        }
    }
}

