/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.zookeeper;

import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.AbstractMap;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.SafeRunnable;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.stats.CacheMetricsCollector;
import org.apache.pulsar.zookeeper.ZkUtils;
import org.apache.pulsar.zookeeper.ZooKeeperCacheListener;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class ZooKeeperCache
implements Watcher {
    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperCache.class);
    public static final String ZK_CACHE_INSTANCE = "zk_cache_instance";
    protected final AsyncLoadingCache<String, Pair<Map.Entry<Object, Stat>, Long>> dataCache;
    protected final AsyncLoadingCache<String, Set<String>> childrenCache;
    protected final AsyncLoadingCache<String, Boolean> existsCache;
    private final OrderedExecutor executor;
    private final OrderedExecutor backgroundExecutor = OrderedExecutor.newBuilder().name("zk-cache-background").numThreads(2).build();
    private boolean shouldShutdownExecutor;
    private final int zkOperationTimeoutSeconds;
    private static final int DEFAULT_CACHE_EXPIRY_SECONDS = 300;
    private final int cacheExpirySeconds;
    protected AtomicReference<ZooKeeper> zkSession = new AtomicReference<Object>(null);
    private static Logger log = LoggerFactory.getLogger(ZooKeeperCache.class);

    public ZooKeeperCache(String cacheName, ZooKeeper zkSession, int zkOperationTimeoutSeconds, OrderedExecutor executor) {
        this(cacheName, zkSession, zkOperationTimeoutSeconds, executor, 300);
    }

    public ZooKeeperCache(String cacheName, ZooKeeper zkSession, int zkOperationTimeoutSeconds, OrderedExecutor executor, int cacheExpirySeconds) {
        Preconditions.checkNotNull((Object)executor);
        this.zkOperationTimeoutSeconds = zkOperationTimeoutSeconds;
        this.executor = executor;
        this.zkSession.set(zkSession);
        this.shouldShutdownExecutor = false;
        this.cacheExpirySeconds = cacheExpirySeconds;
        this.dataCache = Caffeine.newBuilder().recordStats().buildAsync((key, executor1) -> null);
        this.childrenCache = Caffeine.newBuilder().recordStats().expireAfterWrite((long)cacheExpirySeconds, TimeUnit.SECONDS).buildAsync((key, executor1) -> null);
        this.existsCache = Caffeine.newBuilder().recordStats().expireAfterWrite((long)cacheExpirySeconds, TimeUnit.SECONDS).buildAsync((key, executor1) -> null);
        CacheMetricsCollector.CAFFEINE.addCache(cacheName + "-data", this.dataCache);
        CacheMetricsCollector.CAFFEINE.addCache(cacheName + "-children", this.childrenCache);
        CacheMetricsCollector.CAFFEINE.addCache(cacheName + "-exists", this.existsCache);
    }

    public ZooKeeperCache(String cacheName, ZooKeeper zkSession, int zkOperationTimeoutSeconds) {
        this(cacheName, zkSession, zkOperationTimeoutSeconds, OrderedExecutor.newBuilder().name("zk-cache-callback-executor").build());
        this.shouldShutdownExecutor = true;
    }

    public ZooKeeper getZooKeeper() {
        return this.zkSession.get();
    }

    public <T> void process(WatchedEvent event, final CacheUpdater<T> updater) {
        final String path = event.getPath();
        if (path != null) {
            this.dataCache.synchronous().invalidate((Object)path);
            this.childrenCache.synchronous().invalidate((Object)path);
            if (event.getType().equals((Object)Watcher.Event.EventType.NodeCreated) || event.getType().equals((Object)Watcher.Event.EventType.NodeDeleted)) {
                this.childrenCache.synchronous().invalidate((Object)ZkUtils.getParentForPath(path));
            }
            this.existsCache.synchronous().invalidate((Object)path);
            if (this.executor != null && updater != null) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Submitting reload cache task to the executor for path: {}, updater: {}", (Object)path, updater);
                }
                try {
                    this.executor.executeOrdered((Object)path, (SafeRunnable)new org.apache.bookkeeper.util.SafeRunnable(){

                        public void safeRun() {
                            updater.reloadCache(path);
                        }
                    });
                }
                catch (RejectedExecutionException e) {
                    LOG.error("Failed to updated zk-cache {} on zk-watch {}", (Object)path, (Object)e.getMessage());
                }
            } else if (LOG.isDebugEnabled()) {
                LOG.debug("Cannot reload cache for path: {}, updater: {}", (Object)path, updater);
            }
        }
    }

    public void invalidateAll() {
        this.invalidateAllData();
        this.invalidateAllChildren();
        this.invalidateAllExists();
    }

    private void invalidateAllExists() {
        this.existsCache.synchronous().invalidateAll();
    }

    public void invalidateAllData() {
        this.dataCache.synchronous().invalidateAll();
    }

    public void invalidateAllChildren() {
        this.childrenCache.synchronous().invalidateAll();
    }

    public void invalidateData(String path) {
        this.dataCache.synchronous().invalidate((Object)path);
    }

    public void invalidateChildren(String path) {
        this.childrenCache.synchronous().invalidate((Object)path);
    }

    private void invalidateExists(String path) {
        this.existsCache.synchronous().invalidate((Object)path);
    }

    public void asyncInvalidate(String path) {
        this.backgroundExecutor.execute(() -> this.invalidate(path));
    }

    public int getZkOperationTimeoutSeconds() {
        return this.zkOperationTimeoutSeconds;
    }

    public void invalidate(String path) {
        this.invalidateData(path);
        this.invalidateChildren(path);
        this.invalidateExists(path);
    }

    public boolean exists(String path) throws KeeperException, InterruptedException {
        return this.exists(path, this);
    }

    private boolean exists(String path, Watcher watcher) throws KeeperException, InterruptedException {
        return this.existsAsync(path, watcher).join();
    }

    public CompletableFuture<Boolean> existsAsync(String path, Watcher watcher) {
        return this.existsCache.get((Object)path, (p, executor) -> {
            ZooKeeper zk = this.zkSession.get();
            if (zk == null) {
                return FutureUtil.failedFuture((Throwable)new IOException("ZK session not ready"));
            }
            CompletableFuture future = new CompletableFuture();
            zk.exists(path, watcher, (rc, path1, ctx, stat) -> {
                if (rc == KeeperException.Code.OK.intValue()) {
                    future.complete(true);
                } else if (rc == KeeperException.Code.NONODE.intValue()) {
                    future.complete(false);
                } else {
                    future.completeExceptionally(KeeperException.create((int)rc));
                }
            }, null);
            return future;
        });
    }

    public <T> Optional<T> getData(String path, Deserializer<T> deserializer) throws Exception {
        return this.getData(path, this, deserializer).map(e -> e.getKey());
    }

    public <T> Optional<Map.Entry<T, Stat>> getEntry(String path, Deserializer<T> deserializer) throws Exception {
        return this.getData(path, this, deserializer);
    }

    public <T> CompletableFuture<Optional<Map.Entry<T, Stat>>> getEntryAsync(String path, Deserializer<T> deserializer) {
        CompletableFuture future = new CompletableFuture();
        ((CompletableFuture)this.getDataAsync(path, this, deserializer).thenAccept(future::complete)).exceptionally(ex -> {
            this.asyncInvalidate(path);
            if (ex.getCause() instanceof KeeperException.NoNodeException) {
                future.complete(Optional.empty());
            } else {
                future.completeExceptionally(ex.getCause());
            }
            return null;
        });
        return future;
    }

    public <T> CompletableFuture<Optional<T>> getDataAsync(String path, Deserializer<T> deserializer) {
        CompletableFuture future = new CompletableFuture();
        ((CompletableFuture)this.getDataAsync(path, this, deserializer).thenAccept(data -> future.complete(data.map(e -> e.getKey())))).exceptionally(ex -> {
            this.asyncInvalidate(path);
            if (ex.getCause() instanceof KeeperException.NoNodeException) {
                future.complete(Optional.empty());
            } else {
                future.completeExceptionally(ex.getCause());
            }
            return null;
        });
        return future;
    }

    public <T> Optional<Map.Entry<T, Stat>> getData(String path, Watcher watcher, Deserializer<T> deserializer) throws Exception {
        try {
            return this.getDataAsync(path, watcher, deserializer).get(this.zkOperationTimeoutSeconds, TimeUnit.SECONDS);
        }
        catch (ExecutionException e) {
            this.asyncInvalidate(path);
            Throwable cause = e.getCause();
            if (cause instanceof KeeperException) {
                throw (KeeperException)cause;
            }
            if (cause instanceof InterruptedException) {
                LOG.warn("Time-out while fetching {} zk-data in {} sec", (Object)path, (Object)this.zkOperationTimeoutSeconds);
                throw (InterruptedException)cause;
            }
            if (cause instanceof RuntimeException) {
                throw (RuntimeException)cause;
            }
            throw new RuntimeException(cause);
        }
        catch (TimeoutException e) {
            LOG.warn("Time-out while fetching {} zk-data in {} sec", (Object)path, (Object)this.zkOperationTimeoutSeconds);
            this.asyncInvalidate(path);
            throw e;
        }
    }

    public <T> CompletableFuture<Optional<Map.Entry<T, Stat>>> getDataAsync(String path, Watcher watcher, Deserializer<T> deserializer) {
        Preconditions.checkNotNull((Object)path);
        Preconditions.checkNotNull(deserializer);
        this.checkAndRefreshExpiredEntry(path, deserializer);
        CompletableFuture future = new CompletableFuture();
        ((CompletableFuture)this.dataCache.get((Object)path, (p, executor) -> {
            CompletableFuture zkFuture = new CompletableFuture();
            try {
                this.zkSession.get().getData(path, watcher, (rc, path1, ctx, content, stat) -> {
                    if (rc == KeeperException.Code.OK.intValue()) {
                        try {
                            Object obj = deserializer.deserialize(path, content);
                            executor.execute(() -> zkFuture.complete(ImmutablePair.of(new AbstractMap.SimpleImmutableEntry<Object, Stat>(obj, stat), (Object)System.nanoTime())));
                        }
                        catch (Exception e) {
                            executor.execute(() -> zkFuture.completeExceptionally(e));
                        }
                    } else if (rc == KeeperException.Code.NONODE.intValue()) {
                        executor.execute(() -> zkFuture.complete(null));
                    } else {
                        executor.execute(() -> zkFuture.completeExceptionally(KeeperException.create((int)rc)));
                    }
                }, null);
            }
            catch (Exception e) {
                LOG.warn("Failed to access zkSession for {} {}", new Object[]{path, e.getMessage(), e});
                zkFuture.completeExceptionally(e);
            }
            return zkFuture;
        }).thenAccept(result -> {
            if (result != null) {
                future.complete(Optional.of((Map.Entry)result.getLeft()));
            } else {
                future.complete(Optional.empty());
            }
        })).exceptionally(ex -> {
            future.completeExceptionally((Throwable)ex);
            return null;
        });
        return future;
    }

    private <T> void checkAndRefreshExpiredEntry(String path, Deserializer<T> deserializer) {
        Pair entryPair;
        CompletableFuture result = this.dataCache.getIfPresent((Object)path);
        if (result != null && result.isDone() && (entryPair = (Pair)result.getNow(null)) != null && entryPair.getRight() != null && System.nanoTime() - (Long)entryPair.getRight() > TimeUnit.SECONDS.toNanos(this.cacheExpirySeconds)) {
            this.zkSession.get().getData(path, (Watcher)this, (rc, path1, ctx, content, stat) -> {
                if (rc != KeeperException.Code.OK.intValue()) {
                    log.warn("Failed to refresh zookeeper-cache for {} due to {}", (Object)path, (Object)rc);
                    return;
                }
                try {
                    Object obj = deserializer.deserialize(path, content);
                    this.dataCache.put((Object)path, CompletableFuture.completedFuture(ImmutablePair.of(new AbstractMap.SimpleImmutableEntry(obj, stat), (Object)System.nanoTime())));
                }
                catch (Exception e) {
                    log.warn("Failed to refresh zookeeper-cache for {}", (Object)path, (Object)e);
                }
            }, null);
        }
    }

    public Set<String> getChildren(String path) throws KeeperException, InterruptedException {
        try {
            return this.getChildrenAsync(path, this).join();
        }
        catch (CompletionException e) {
            if (e.getCause() instanceof KeeperException) {
                throw (KeeperException)e.getCause();
            }
            throw e;
        }
    }

    public CompletableFuture<Set<String>> getChildrenAsync(String path, Watcher watcher) {
        return this.childrenCache.get((Object)path, (p, executor) -> {
            CompletableFuture future = new CompletableFuture();
            executor.execute((Runnable)org.apache.bookkeeper.util.SafeRunnable.safeRun(() -> {
                ZooKeeper zk = this.zkSession.get();
                if (zk == null) {
                    future.completeExceptionally(new IOException("ZK session not ready"));
                    return;
                }
                zk.getChildren(path, watcher, (rc, path1, ctx, children) -> {
                    if (rc == KeeperException.Code.OK.intValue()) {
                        future.complete(Sets.newTreeSet((Iterable)children));
                    } else if (rc == KeeperException.Code.NONODE.intValue()) {
                        ((CompletableFuture)this.existsAsync(path, watcher).thenAccept(exists -> {
                            if (exists.booleanValue()) {
                                ((CompletableFuture)this.getChildrenAsync(path, watcher).thenAccept(c -> future.complete(c))).exceptionally(ex -> {
                                    future.completeExceptionally((Throwable)ex);
                                    return null;
                                });
                            } else {
                                future.complete(Collections.emptySet());
                            }
                        })).exceptionally(ex -> {
                            future.completeExceptionally((Throwable)ex);
                            return null;
                        });
                    } else {
                        future.completeExceptionally(KeeperException.create((int)rc));
                    }
                }, null);
            }));
            return future;
        });
    }

    public <T> T getDataIfPresent(String path) {
        CompletableFuture f = this.dataCache.getIfPresent((Object)path);
        if (f != null && f.isDone() && !f.isCompletedExceptionally()) {
            return (T)((Map.Entry)((Pair)f.join()).getLeft()).getKey();
        }
        return null;
    }

    public Set<String> getChildrenIfPresent(String path) {
        CompletableFuture future = this.childrenCache.getIfPresent((Object)path);
        if (future != null && future.isDone() && !future.isCompletedExceptionally()) {
            return future.getNow(null);
        }
        return null;
    }

    public void process(WatchedEvent event) {
        LOG.info("[{}] Received ZooKeeper watch event: {}", (Object)this.zkSession.get(), (Object)event);
        this.process(event, null);
    }

    public void invalidateRoot(String root) {
        for (String key : this.childrenCache.synchronous().asMap().keySet()) {
            if (!key.startsWith(root)) continue;
            this.childrenCache.synchronous().invalidate((Object)key);
        }
    }

    public void stop() {
        if (this.shouldShutdownExecutor) {
            this.executor.shutdown();
        }
        this.backgroundExecutor.shutdown();
    }

    public boolean checkRegNodeAndWaitExpired(String regPath) throws IOException {
        final CountDownLatch prevNodeLatch = new CountDownLatch(1);
        Watcher zkPrevRegNodewatcher = new Watcher(){

            public void process(WatchedEvent event) {
                if (Watcher.Event.EventType.NodeDeleted == event.getType()) {
                    prevNodeLatch.countDown();
                }
            }
        };
        try {
            Stat stat = this.getZooKeeper().exists(regPath, zkPrevRegNodewatcher);
            if (null != stat) {
                if (stat.getEphemeralOwner() != this.getZooKeeper().getSessionId()) {
                    log.info("Previous bookie registration znode: {} exists, so waiting zk sessiontimeout: {} ms for znode deletion", (Object)regPath, (Object)this.getZooKeeper().getSessionTimeout());
                    if (!prevNodeLatch.await(this.getZooKeeper().getSessionTimeout(), TimeUnit.MILLISECONDS)) {
                        throw new KeeperException.NodeExistsException(regPath);
                    }
                    return false;
                }
                return true;
            }
            return false;
        }
        catch (KeeperException ke) {
            log.error("ZK exception checking and wait ephemeral znode {} expired : ", (Object)regPath, (Object)ke);
            throw new IOException("ZK exception checking and wait ephemeral znode " + regPath + " expired", ke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            log.error("Interrupted checking and wait ephemeral znode {} expired : ", (Object)regPath, (Object)ie);
            throw new IOException("Interrupted checking and wait ephemeral znode " + regPath + " expired", ie);
        }
    }

    public static interface CacheUpdater<T> {
        public void registerListener(ZooKeeperCacheListener<T> var1);

        public void unregisterListener(ZooKeeperCacheListener<T> var1);

        public void reloadCache(String var1);
    }

    public static interface Deserializer<T> {
        public T deserialize(String var1, byte[] var2) throws Exception;
    }
}

