/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.statemachine.zookeeper;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundPathable;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.api.transaction.CuratorTransaction;
import org.apache.curator.framework.api.transaction.CuratorTransactionBridge;
import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.apache.curator.framework.recipes.nodes.PersistentNode;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.data.Stat;
import org.springframework.statemachine.StateMachine;
import org.springframework.statemachine.StateMachineContext;
import org.springframework.statemachine.StateMachineException;
import org.springframework.statemachine.StateMachinePersist;
import org.springframework.statemachine.ensemble.StateMachineEnsembleException;
import org.springframework.statemachine.ensemble.StateMachineEnsembleObjectSupport;
import org.springframework.statemachine.zookeeper.ZookeeperStateMachinePersist;
import reactor.core.publisher.Mono;

public class ZookeeperStateMachineEnsemble<S, E>
extends StateMachineEnsembleObjectSupport<S, E> {
    private static final Log log = LogFactory.getLog(ZookeeperStateMachineEnsemble.class);
    private final String uuid = UUID.randomUUID().toString();
    private static final int DEFAULT_LOGSIZE = 32;
    private static final String PATH_CURRENT = "current";
    private static final String PATH_LOG = "log";
    private static final String PATH_MEMBERS = "members";
    private static final String PATH_MUTEX = "mutex";
    private final CuratorFramework curatorClient;
    private final String baseDataPath;
    private final String statePath;
    private final String logPath;
    private final int logSize;
    private final String memberPath;
    private final String mutexPath;
    private final boolean cleanState;
    private final StateMachinePersist<S, E, Stat> persist;
    private final AtomicReference<StateWrapper> stateRef = new AtomicReference();
    private final AtomicReference<StateWrapper> notifyRef = new AtomicReference();
    private final CuratorWatcher watcher = new StateWatcher();
    private PersistentNode node;
    private final Queue<StateMachine<S, E>> joinQueue = new ConcurrentLinkedQueue<StateMachine<S, E>>();
    private final List<StateMachine<S, E>> joined = new ArrayList<StateMachine<S, E>>();
    private final Object joinLock = new Object();
    private final ConnectionStateListener connectionListener = new LocalConnectionStateListener();

    public ZookeeperStateMachineEnsemble(CuratorFramework curatorClient, String basePath) {
        this(curatorClient, basePath, true, 32);
    }

    public ZookeeperStateMachineEnsemble(CuratorFramework curatorClient, String basePath, boolean cleanState, int logSize) {
        this.curatorClient = curatorClient;
        this.cleanState = cleanState;
        this.logSize = logSize;
        this.baseDataPath = basePath + "/data";
        this.statePath = this.baseDataPath + "/current";
        this.logPath = this.baseDataPath + "/log";
        this.memberPath = basePath + "/members";
        this.mutexPath = basePath + "/mutex";
        this.persist = new ZookeeperStateMachinePersist(curatorClient, this.statePath, this.logPath, logSize);
        this.setAutoStartup(true);
    }

    protected void onInit() throws Exception {
        this.initPaths();
    }

    protected Mono<Void> doPreStartReactively() {
        return Mono.fromRunnable(() -> this.doStart());
    }

    protected void doStart() {
        this.registerWatcherForStatePath();
        StateWrapper stateWrapper = this.stateRef.get();
        if (stateWrapper == null) {
            try {
                StateWrapper currentStateWrapper = this.readCurrentContext();
                this.stateRef.set(currentStateWrapper);
                this.notifyRef.set(currentStateWrapper);
            }
            catch (Exception e) {
                log.error((Object)"Error reading current state during start", (Throwable)e);
            }
        }
        this.curatorClient.getConnectionStateListenable().addListener((Object)this.connectionListener);
        if (this.curatorClient.getState() == CuratorFrameworkState.STARTED) {
            this.handleZkConnect();
        } else {
            this.curatorClient.start();
        }
    }

    protected Mono<Void> doPreStopReactively() {
        return Mono.fromRunnable(() -> this.doStop());
    }

    protected void doStop() {
        if (this.node != null && this.curatorClient.getState() != CuratorFrameworkState.STOPPED) {
            try {
                this.node.close();
            }
            catch (IOException iOException) {
            }
            finally {
                this.node = null;
            }
        }
        this.curatorClient.getConnectionStateListenable().removeListener((Object)this.connectionListener);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void join(StateMachine<S, E> stateMachine) {
        if (!this.isRunning()) {
            this.joinQueue.add(stateMachine);
        } else {
            StateWrapper stateWrapper = this.stateRef.get();
            Object object = this.joinLock;
            synchronized (object) {
                this.joined.add(stateMachine);
            }
            this.notifyJoined(stateMachine, stateWrapper != null ? stateWrapper.context : null);
        }
    }

    public StateMachine<S, E> getLeader() {
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void joinQueued() {
        StateMachine<S, E> stateMachine = null;
        Object object = this.joinLock;
        synchronized (object) {
            while ((stateMachine = this.joinQueue.poll()) != null) {
                this.joined.add(stateMachine);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void notifyJoined() {
        StateWrapper stateWrapper = this.stateRef.get();
        Object object = this.joinLock;
        synchronized (object) {
            for (StateMachine<S, E> stateMachine : this.joined) {
                this.notifyJoined(stateMachine, stateWrapper != null ? stateWrapper.context : null);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void notifyLeft() {
        StateWrapper stateWrapper = this.stateRef.get();
        Object object = this.joinLock;
        synchronized (object) {
            for (StateMachine<S, E> stateMachine : this.joined) {
                this.notifyLeft(stateMachine, stateWrapper != null ? stateWrapper.context : null);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void leave(StateMachine<S, E> stateMachine) {
        if (this.node != null) {
            try {
                this.node.close();
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }
        boolean removed = false;
        Object object = this.joinLock;
        synchronized (object) {
            removed = this.joined.remove(stateMachine);
        }
        if (removed) {
            StateWrapper stateWrapper = this.stateRef.get();
            this.notifyLeft(stateMachine, stateWrapper != null ? stateWrapper.context : null);
        }
    }

    public synchronized void setState(StateMachineContext<S, E> context) {
        if (log.isDebugEnabled()) {
            log.debug((Object)("Setting state context=" + context));
        }
        try {
            Stat stat = new Stat();
            StateWrapper stateWrapper = this.stateRef.get();
            if (stateWrapper != null) {
                stat.setVersion(stateWrapper.version);
            }
            if (log.isDebugEnabled()) {
                log.debug((Object)("Requesting persist write " + context + " with version " + stat.getVersion() + " for ensemble " + this.uuid));
            }
            this.persist.write(context, (Object)stat);
            if (log.isDebugEnabled()) {
                log.debug((Object)("Request persist write ok " + context + " new version " + stat.getVersion() + " for ensemble " + this.uuid));
            }
            this.stateRef.set(new StateWrapper(context, stat.getVersion()));
        }
        catch (Exception e) {
            throw new StateMachineException("Error persisting data", e);
        }
    }

    public StateMachineContext<S, E> getState() {
        return this.readCurrentContext().context;
    }

    private void handleZkConnect() {
        log.info((Object)"Handling Zookeeper connect");
        this.joinQueued();
        this.notifyJoined();
        this.registerWatcherForStatePath();
    }

    private void handleZkDisconnect() {
        log.info((Object)"Handling Zookeeper disconnect");
        this.notifyError(new StateMachineEnsembleException("Lost connection to zookeeper"));
        this.notifyLeft();
    }

    private StateWrapper readCurrentContext() {
        try {
            Stat stat = new Stat();
            this.registerWatcherForStatePath();
            StateMachineContext context = this.persist.read((Object)stat);
            return new StateWrapper(context, stat.getVersion());
        }
        catch (Exception e) {
            throw new StateMachineException("Error reading data", e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void initPaths() {
        InterProcessSemaphoreMutex mutex = new InterProcessSemaphoreMutex(this.curatorClient, this.mutexPath);
        try {
            if (log.isTraceEnabled()) {
                log.trace((Object)"About to acquire mutex");
            }
            mutex.acquire();
            if (log.isTraceEnabled()) {
                log.trace((Object)"Mutex acquired");
            }
            if (this.cleanState && this.curatorClient.checkExists().forPath(this.memberPath) != null && ((List)this.curatorClient.getChildren().forPath(this.memberPath)).size() == 0) {
                log.info((Object)("Deleting from " + this.baseDataPath));
                this.curatorClient.delete().deletingChildrenIfNeeded().forPath(this.baseDataPath);
            }
            this.node = new PersistentNode(this.curatorClient, CreateMode.EPHEMERAL, true, this.memberPath + "/" + this.uuid, new byte[0]);
            this.node.start();
            this.node.waitForInitialCreate(60L, TimeUnit.SECONDS);
            if (this.curatorClient.checkExists().forPath(this.baseDataPath) == null) {
                CuratorTransaction tx = this.curatorClient.inTransaction();
                CuratorTransactionFinal tt = ((CuratorTransactionBridge)tx.create().forPath(this.baseDataPath)).and();
                tt = ((CuratorTransactionBridge)tt.create().forPath(this.statePath)).and();
                tt = ((CuratorTransactionBridge)tt.create().forPath(this.logPath)).and();
                for (int i = 0; i < this.logSize; ++i) {
                    tt = ((CuratorTransactionBridge)tt.create().forPath(this.logPath + "/" + i)).and();
                }
                tt.commit();
            }
        }
        catch (Exception e) {
            log.warn((Object)"Error in initPaths", (Throwable)e);
        }
        finally {
            try {
                mutex.release();
                if (log.isTraceEnabled()) {
                    log.trace((Object)"Mutex released");
                }
            }
            catch (Exception exception) {}
        }
    }

    protected void registerWatcherForStatePath() {
        try {
            if (this.curatorClient.getState() != CuratorFrameworkState.STOPPED) {
                ((BackgroundPathable)this.curatorClient.checkExists().usingWatcher(this.watcher)).forPath(this.statePath);
            }
        }
        catch (Exception e) {
            log.warn((Object)("Registering watcher for path " + this.statePath + " threw error"), (Throwable)e);
        }
    }

    private void mayNotifyStateChanged(StateWrapper wrapper) {
        StateWrapper notifyWrapper = this.notifyRef.get();
        if (notifyWrapper == null) {
            if (log.isDebugEnabled()) {
                log.debug((Object)("notifyWrapper null, wrapper=[" + wrapper + "] for " + this));
            }
            this.notifyRef.set(wrapper);
            this.notifyStateChanged(wrapper.context);
        } else if (wrapper.version > notifyWrapper.version) {
            if (log.isDebugEnabled()) {
                log.debug((Object)("Wrapper version higher that notifyWrapper version, notifyWrapper=[" + notifyWrapper + "], wrapper=[" + wrapper + "] for " + this));
            }
            this.notifyRef.set(wrapper);
            this.notifyStateChanged(wrapper.context);
        }
    }

    private void traceLogWrappers(StateWrapper currentWrapper, StateWrapper notifyWrapper, StateWrapper newWrapper) {
        if (log.isTraceEnabled()) {
            log.trace((Object)("Wrappers id=" + this.uuid + "\ncurrentWrapper=[" + currentWrapper + "] \nnotifyWrapper=[" + notifyWrapper + "] \nnewWrapper=[" + newWrapper + "]"));
        }
    }

    public String toString() {
        return "ZookeeperStateMachineEnsemble [uuid=" + this.uuid + "]";
    }

    private boolean handleDataChange() throws Exception {
        StateWrapper currentWrapper = this.stateRef.get();
        StateWrapper notifyWrapper = this.notifyRef.get();
        StateWrapper newWrapper = this.readCurrentContext();
        this.traceLogWrappers(currentWrapper, notifyWrapper, newWrapper);
        if (currentWrapper.version + 1 != newWrapper.version || notifyWrapper.version < currentWrapper.version || !this.stateRef.compareAndSet(currentWrapper, newWrapper)) {
            int start = (notifyWrapper != null ? notifyWrapper.version : 0) % this.logSize;
            int count = newWrapper.version - (notifyWrapper != null ? notifyWrapper.version : 0);
            if (log.isDebugEnabled()) {
                log.debug((Object)("Events missed, trying to replay start " + start + " count " + count));
            }
            for (int i = start; i < start + count; ++i) {
                Stat stat = new Stat();
                StateMachineContext context = ((ZookeeperStateMachinePersist)this.persist).readLog(i, stat);
                int ver = (stat.getVersion() - 1) * this.logSize + (i + 1);
                if (i + this.logSize < ver) {
                    this.notifyError(new StateMachineEnsembleException("Current version behind more than log size"));
                    break;
                }
                if (log.isDebugEnabled()) {
                    log.debug((Object)("Replay position " + i + " with version " + ver));
                    log.debug((Object)("Context in position " + i + " " + context));
                }
                StateWrapper wrapper = new StateWrapper(context, ver);
                StateWrapper currentWrapperx = this.stateRef.get();
                if (currentWrapperx.context == null) {
                    this.stateRef.set(wrapper);
                } else if (wrapper.version == currentWrapperx.version + 1) {
                    this.stateRef.set(wrapper);
                }
                this.mayNotifyStateChanged(wrapper);
            }
            return count > 0;
        }
        this.mayNotifyStateChanged(newWrapper);
        return false;
    }

    private class StateWatcher
    implements CuratorWatcher {
        private StateWatcher() {
        }

        public void process(WatchedEvent event) throws Exception {
            if (log.isTraceEnabled()) {
                log.trace((Object)("Process WatchedEvent: id=" + ZookeeperStateMachineEnsemble.this.uuid + " " + event));
            }
            switch (event.getType()) {
                case NodeDataChanged: {
                    try {
                        if (ZookeeperStateMachineEnsemble.this.handleDataChange()) {
                            ZookeeperStateMachineEnsemble.this.handleDataChange();
                        }
                    }
                    catch (Exception e) {
                        log.error((Object)"Error handling event", (Throwable)e);
                    }
                    ZookeeperStateMachineEnsemble.this.registerWatcherForStatePath();
                    break;
                }
                default: {
                    ZookeeperStateMachineEnsemble.this.registerWatcherForStatePath();
                }
            }
        }
    }

    private class LocalConnectionStateListener
    implements ConnectionStateListener {
        private LocalConnectionStateListener() {
        }

        public void stateChanged(CuratorFramework client, ConnectionState newState) {
            if (ZookeeperStateMachineEnsemble.this.curatorClient == client) {
                switch (newState) {
                    case CONNECTED: 
                    case RECONNECTED: {
                        ZookeeperStateMachineEnsemble.this.handleZkConnect();
                        break;
                    }
                    case READ_ONLY: {
                        break;
                    }
                    case LOST: 
                    case SUSPENDED: {
                        ZookeeperStateMachineEnsemble.this.handleZkDisconnect();
                        break;
                    }
                }
            }
        }
    }

    private class StateWrapper {
        private final StateMachineContext<S, E> context;
        private final int version;

        public StateWrapper(StateMachineContext<S, E> context, int version) {
            this.context = context;
            this.version = version;
        }

        public String toString() {
            return "StateWrapper [context=" + this.context + ", version=" + this.version + "]";
        }
    }
}

