/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.cache.mvcc;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.IgniteDiagnosticPrepareContext;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.discovery.CustomEventListener;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.mvcc.DeadlockDetectionManager;
import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorChangeAware;
import org.apache.ignite.internal.processors.cache.mvcc.MvccFuture;
import org.apache.ignite.internal.processors.cache.mvcc.MvccLongList;
import org.apache.ignite.internal.processors.cache.mvcc.MvccProcessor;
import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshotFuture;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshotResponseListener;
import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion;
import org.apache.ignite.internal.processors.cache.mvcc.MvccVersionImpl;
import org.apache.ignite.internal.processors.cache.mvcc.PreviousQueries;
import org.apache.ignite.internal.processors.cache.mvcc.VacuumMetrics;
import org.apache.ignite.internal.processors.cache.mvcc.VacuumMetricsReducer;
import org.apache.ignite.internal.processors.cache.mvcc.VacuumTask;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestQueryCntr;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestQueryId;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestTx;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccActiveQueriesMessage;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccFutureResponse;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccMessage;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccQuerySnapshotRequest;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccRecoveryFinishedMessage;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccSnapshotResponse;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccTxSnapshotRequest;
import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxKey;
import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxLog;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
import org.apache.ignite.internal.processors.cache.persistence.DatabaseLifecycleListener;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMetrics;
import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccDataRow;
import org.apache.ignite.internal.processors.cache.tree.mvcc.search.MvccLinkAwareSearchRow;
import org.apache.ignite.internal.util.GridAtomicLong;
import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridClosureException;
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteReducer;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.IgniteNodeValidationResult;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

public class MvccProcessorImpl
extends GridProcessorAdapter
implements MvccProcessor,
DatabaseLifecycleListener {
    private static final boolean FORCE_MVCC = IgniteSystemProperties.getBoolean("IGNITE_FORCE_MVCC_MODE_IN_TESTS", false);
    private static final IgniteProductVersion MVCC_SUPPORTED_SINCE = IgniteProductVersion.fromString("2.7.0");
    private static final Waiter LOCAL_TRANSACTION_MARKER = new LocalTransactionMarker();
    private static IgniteClosure<Collection<ClusterNode>, ClusterNode> crdC;
    private volatile MvccCoordinator curCrd = MvccCoordinator.UNASSIGNED_COORDINATOR;
    @Nullable
    private TxLog txLog;
    private List<GridWorker> vacuumWorkers;
    private BlockingQueue<VacuumTask> cleanupQueue;
    private final Object mux = new Object();
    private final GridAtomicLong futIdCntr = new GridAtomicLong(0L);
    private final GridAtomicLong mvccCntr = new GridAtomicLong(3L);
    private final GridAtomicLong committedCntr = new GridAtomicLong(1L);
    private final Map<Long, ActiveTx> activeTxs = new HashMap<Long, ActiveTx>();
    private final Map<Long, MvccQueryTracker> activeTrackers = new ConcurrentHashMap<Long, MvccQueryTracker>();
    private final Map<UUID, Map<Long, MvccSnapshotResponseListener>> snapLsnrs = new ConcurrentHashMap<UUID, Map<Long, MvccSnapshotResponseListener>>();
    private final Map<Long, WaitAckFuture> ackFuts = new ConcurrentHashMap<Long, WaitAckFuture>();
    private final Map<TxKey, Waiter> waitMap = new ConcurrentHashMap<TxKey, Waiter>();
    private final ActiveQueries activeQueries = new ActiveQueries();
    private final PreviousQueries prevQueries = new PreviousQueries();
    private final GridFutureAdapter<Void> initFut = new GridFutureAdapter();
    private volatile boolean mvccEnabled;
    private volatile boolean mvccSupported = true;
    private volatile AffinityTopologyVersion readyVer = AffinityTopologyVersion.NONE;
    private final ConcurrentHashMap<UUID, RecoveryBallotBox> recoveryBallotBoxes = new ConcurrentHashMap();
    private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
    private final DiscoveryEventListener discoLsnr;
    private final GridMessageListener msgLsnr;
    private final CustomEventListener customLsnr;
    private final Object stateMux = new Object();

    static void coordinatorAssignClosure(IgniteClosure<Collection<ClusterNode>, ClusterNode> crdC) {
        MvccProcessorImpl.crdC = crdC;
    }

    public MvccProcessorImpl(GridKernalContext ctx) {
        super(ctx);
        ctx.internalSubscriptionProcessor().registerDatabaseListener(this);
        this.discoLsnr = this::onDiscovery;
        this.msgLsnr = new MvccMessageListener();
        this.customLsnr = new CustomEventListener<DynamicCacheChangeBatch>(){

            @Override
            public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, DynamicCacheChangeBatch msg) {
                MvccProcessorImpl.this.checkMvccCacheStarted(msg);
            }
        };
    }

    @Override
    public void start() throws IgniteCheckedException {
        this.ctx.event().addDiscoveryEventListener(this.discoLsnr, 12, 11, 10);
        this.ctx.io().addMessageListener(GridTopic.TOPIC_CACHE_COORDINATOR, this.msgLsnr);
        this.ctx.discovery().setCustomEventListener(DynamicCacheChangeBatch.class, this.customLsnr);
    }

    @Override
    public boolean mvccEnabled() {
        return this.mvccEnabled;
    }

    @Override
    public void preProcessCacheConfiguration(CacheConfiguration ccfg) {
        if (FORCE_MVCC && ccfg.getAtomicityMode() == CacheAtomicityMode.TRANSACTIONAL && !CU.isSystemCache(ccfg.getName())) {
            ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT);
            ccfg.setNearConfiguration(null);
        }
        if (ccfg.getAtomicityMode() == CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT) {
            if (!this.mvccSupported) {
                throw new IgniteException("Cannot start MVCC transactional cache. MVCC is unsupported by the cluster.");
            }
            this.mvccEnabled = true;
        }
    }

    @Override
    public void validateCacheConfiguration(CacheConfiguration ccfg) {
        if (ccfg.getAtomicityMode() == CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT) {
            if (!this.mvccSupported) {
                throw new IgniteException("Cannot start MVCC transactional cache. MVCC is unsupported by the cluster.");
            }
            this.mvccEnabled = true;
        }
    }

    @Override
    @Nullable
    public IgniteNodeValidationResult validateNode(ClusterNode node) {
        if (this.mvccEnabled && node.version().compareToIgnoreTimestamp(MVCC_SUPPORTED_SINCE) < 0) {
            String errMsg = "Failed to add node to topology. MVCC is enabled on the cluster, but the node doesn't support MVCC [nodeId=" + node.id() + ']';
            return new IgniteNodeValidationResult(node.id(), errMsg);
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void ensureStarted() throws IgniteCheckedException {
        if (!this.ctx.clientNode()) {
            assert (this.mvccEnabled && this.mvccSupported);
            Object object = this.mux;
            synchronized (object) {
                if (this.txLog == null) {
                    this.txLog = new TxLog(this.ctx, this.ctx.cache().context().database());
                }
            }
            this.startVacuumWorkers();
            if (this.log.isInfoEnabled()) {
                this.log.info("Mvcc processor started.");
            }
        }
    }

    @Override
    public void onCacheStop(GridCacheContext cctx) {
        if (cctx.mvccEnabled() && this.txLog != null) {
            assert (this.mvccEnabled && this.mvccSupported);
            boolean hasMvccCaches = this.ctx.cache().cacheDescriptors().values().stream().anyMatch(c -> c.cacheConfiguration().getAtomicityMode() == CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT);
            if (!hasMvccCaches) {
                this.stopTxLog();
            }
        }
    }

    @Override
    public void beforeStop(IgniteCacheDatabaseSharedManager mgr) {
        this.stopTxLog();
    }

    @Override
    public void stopTxLog() {
        this.stopVacuumWorkers();
        if (this.txLog != null) {
            this.txLog.close();
        }
        this.txLog = null;
        this.mvccEnabled = false;
    }

    @Override
    public void onInitDataRegions(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {
        DataStorageConfiguration dscfg = this.dataStorageConfiguration();
        mgr.addDataRegion(dscfg, this.createTxLogRegion(dscfg), CU.isPersistenceEnabled(this.ctx.config()));
    }

    @Override
    public void beforeResumeWalLogging(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {
        this.txLogPageStoreInit(mgr);
    }

    @Override
    public void beforeBinaryMemoryRestore(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {
        this.txLogPageStoreInit(mgr);
    }

    @Override
    public void afterBinaryMemoryRestore(IgniteCacheDatabaseSharedManager mgr, GridCacheDatabaseSharedManager.RestoreBinaryState restoreState) throws IgniteCheckedException {
        boolean hasMvccCaches = this.ctx.cache().persistentCaches().stream().anyMatch(c -> c.cacheConfiguration().getAtomicityMode() == CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT);
        if (hasMvccCaches) {
            this.txLog = new TxLog(this.ctx, mgr);
            this.mvccEnabled = true;
        }
    }

    private void txLogPageStoreInit(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {
        assert (CU.isPersistenceEnabled(this.ctx.config()));
        DataRegion dataRegion = mgr.dataRegion("TxLog");
        PageMetrics pageMetrics = dataRegion.metrics().cacheGrpPageMetrics(TxLog.TX_LOG_CACHE_ID);
        this.ctx.cache().context().pageStore().initialize(TxLog.TX_LOG_CACHE_ID, 0, "TxLog", pageMetrics);
    }

    @Override
    public void onExchangeDone(DiscoCache discoCache) {
        assert (discoCache != null && this.readyVer.compareTo(discoCache.version()) < 0);
        MvccCoordinator curCrd0 = this.curCrd;
        if (curCrd0.disconnected()) {
            return;
        }
        assert (curCrd0.topologyVersion().initialized());
        if (curCrd0.initialized() && curCrd0.local()) {
            this.cleanupOrphanedServerTransactions(discoCache.serverNodes());
        }
        if (!curCrd0.initialized() && this.coordinatorChanged(curCrd0, this.readyVer, discoCache.version())) {
            this.initialize(curCrd0);
        }
    }

    @Override
    public void onLocalJoin(DiscoveryEvent evt, DiscoCache discoCache) {
        assert (evt.type() == 10 && evt.eventNode().isLocal());
        this.checkMvccSupported(discoCache.allNodes());
        this.onCoordinatorChanged(discoCache.version(), discoCache.allNodes(), false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onDisconnected(IgniteFuture<?> reconnectFut) {
        MvccCoordinator curCrd0 = this.curCrd;
        if (!curCrd0.disconnected()) {
            this.onCoordinatorFailed(curCrd0.nodeId());
            Object object = this.stateMux;
            synchronized (object) {
                this.curCrd = MvccCoordinator.DISCONNECTED_COORDINATOR;
            }
            this.readyVer = AffinityTopologyVersion.NONE;
        }
    }

    @Override
    public void onKernalStop(boolean cancel) {
        this.busyLock.block();
        try {
            this.ctx.io().removeMessageListener(GridTopic.TOPIC_CACHE_COORDINATOR, this.msgLsnr);
            this.ctx.event().removeDiscoveryEventListener(this.discoLsnr, 12, 11, 10);
        }
        finally {
            MvccCoordinator curCrd0 = this.curCrd;
            if (curCrd0.nodeId() != null) {
                this.onCoordinatorFailed(curCrd0.nodeId());
            }
        }
    }

    private void onDiscovery(DiscoveryEvent evt, DiscoCache discoCache) {
        assert (evt.type() == 12 || evt.type() == 11 || evt.type() == 10);
        UUID nodeId = evt.eventNode().id();
        AffinityTopologyVersion topVer = discoCache.version();
        List<ClusterNode> nodes = discoCache.allNodes();
        this.checkMvccSupported(nodes);
        MvccCoordinator curCrd0 = this.curCrd;
        if (evt.type() == 10) {
            if (curCrd0.disconnected()) {
                this.onCoordinatorChanged(topVer, nodes, true);
            }
        } else if (Objects.equals(nodeId, curCrd0.nodeId())) {
            this.onCoordinatorFailed(nodeId);
            this.onCoordinatorChanged(topVer, nodes, true);
        } else if (curCrd0.local()) {
            this.activeQueries.onNodeFailed(nodeId);
            this.prevQueries.onNodeFailed(nodeId);
            if (this.mvccEnabled) {
                this.recoveryBallotBoxes.forEach((nearNodeId, ballotBox) -> {
                    ((RecoveryBallotBox)ballotBox).vote(nodeId);
                    this.tryFinishRecoveryVoting((UUID)nearNodeId, (RecoveryBallotBox)ballotBox);
                });
                if (evt.eventNode().isClient()) {
                    RecoveryBallotBox ballotBox2 = this.recoveryBallotBoxes.computeIfAbsent(nodeId, uuid -> new RecoveryBallotBox());
                    ballotBox2.voters(evt.topologyNodes().stream().filter(this::supportsMvcc).map(ClusterNode::id).collect(Collectors.toList()));
                    this.tryFinishRecoveryVoting(nodeId, ballotBox2);
                }
            }
        }
    }

    private void onCoordinatorFailed(UUID nodeId) {
        Map<Long, MvccSnapshotResponseListener> map = this.snapLsnrs.remove(nodeId);
        if (map != null) {
            ClusterTopologyCheckedException ex = new ClusterTopologyCheckedException("Failed to request mvcc version, coordinator left: " + nodeId);
            for (Long id : map.keySet()) {
                MvccSnapshotResponseListener lsnr = map.remove(id);
                if (lsnr == null) continue;
                lsnr.onError(ex);
            }
        }
        for (WaitAckFuture fut : this.ackFuts.values()) {
            fut.onNodeLeft(nodeId);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void onCoordinatorChanged(AffinityTopologyVersion topVer, Collection<ClusterNode> nodes, boolean sndQrys) {
        MvccCoordinator newCrd = this.pickMvccCoordinator(nodes, topVer);
        Object object = this.stateMux;
        synchronized (object) {
            if (this.ctx.clientDisconnected()) {
                return;
            }
            if (newCrd.disconnected()) {
                this.curCrd = newCrd;
                return;
            }
            assert (newCrd.topologyVersion().compareTo(this.curCrd.topologyVersion()) > 0);
            this.curCrd = newCrd;
        }
        this.processActiveQueries(nodes, newCrd, sndQrys);
    }

    private void processActiveQueries(Collection<ClusterNode> nodes, MvccCoordinator newCrd, boolean sndQrys) {
        GridLongList qryIds;
        GridLongList gridLongList = qryIds = sndQrys ? new GridLongList(Stream.concat(this.activeTrackers.values().stream(), this.ctx.cache().context().tm().activeTransactions().stream().filter(tx -> tx.near() && tx.local())).mapToLong(q -> ((MvccCoordinatorChangeAware)q).onMvccCoordinatorChange(newCrd)).filter(MvccCoordinatorChangeAware.ID_FILTER).toArray()) : new GridLongList();
        if (newCrd.local()) {
            this.prevQueries.addActiveQueries(this.ctx.localNodeId(), qryIds);
            this.prevQueries.init(nodes, this.ctx.discovery()::alive);
        } else if (sndQrys) {
            this.ctx.pools().getSystemExecutorService().submit(() -> {
                try {
                    this.sendMessage(newCrd.nodeId(), new MvccActiveQueriesMessage(qryIds));
                }
                catch (IgniteCheckedException e) {
                    U.error(this.log, "Failed to send active queries to mvcc coordinator: " + e);
                }
            });
        }
    }

    private boolean coordinatorChanged(MvccCoordinator currCrd, AffinityTopologyVersion from, AffinityTopologyVersion to) {
        return from.compareTo(currCrd.topologyVersion()) < 0 && to.compareTo(currCrd.topologyVersion()) >= 0;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void cleanupOrphanedServerTransactions(Collection<ClusterNode> liveSrvs) {
        Set ids = liveSrvs.stream().map(ClusterNode::id).collect(Collectors.toSet());
        ArrayList<Long> forRmv = new ArrayList<Long>();
        MvccProcessorImpl mvccProcessorImpl = this;
        synchronized (mvccProcessorImpl) {
            for (Map.Entry<Long, ActiveTx> entry : this.activeTxs.entrySet()) {
                ActiveTx activeTx = entry.getValue();
                if (activeTx.getClass() != ActiveServerTx.class || ids.contains(activeTx.nearNodeId)) continue;
                forRmv.add(entry.getKey());
            }
        }
        for (Long txCntr : forRmv) {
            this.onTxDone(txCntr, true);
        }
    }

    private void initialize(MvccCoordinator curCrd0) {
        this.readyVer = curCrd0.topologyVersion();
        curCrd0.initialized(true);
        if (curCrd0.local()) {
            this.ctx.closure().runLocalSafe(this.initFut::onDone);
        }
    }

    @Override
    @NotNull
    public MvccCoordinator currentCoordinator() {
        return this.curCrd;
    }

    @Override
    public byte state(MvccVersion ver) {
        return this.state(ver.coordinatorVersion(), ver.counter());
    }

    @Override
    public byte state(long crdVer, long cntr) {
        assert (this.txLog != null && this.mvccEnabled) : this.mvccEnabled;
        try {
            return this.txLog.get(crdVer, cntr);
        }
        catch (IgniteCheckedException e) {
            this.ctx.failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
            return 0;
        }
    }

    @Override
    public void updateState(MvccVersion ver, byte state) {
        this.updateState(ver, state, true);
    }

    @Override
    public void updateState(MvccVersion ver, byte state, boolean primary) {
        assert (this.mvccEnabled);
        assert (this.txLog != null || this.waitMap.isEmpty());
        if (this.txLog == null) {
            return;
        }
        try {
            this.txLog.put(new TxKey(ver.coordinatorVersion(), ver.counter()), state, primary);
        }
        catch (IgniteCheckedException e) {
            this.ctx.failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
        }
    }

    @Override
    public void registerLocalTransaction(long crd, long cntr) {
        Waiter old = this.waitMap.putIfAbsent(new TxKey(crd, cntr), LOCAL_TRANSACTION_MARKER);
        assert (old == null || old.hasLocalTransaction());
    }

    @Override
    public boolean hasLocalTransaction(long crd, long cntr) {
        Waiter waiter = this.waitMap.get(new TxKey(crd, cntr));
        return waiter != null && waiter.hasLocalTransaction();
    }

    @Override
    public IgniteInternalFuture<Void> waitForLock(GridCacheContext cctx, MvccVersion waiterVer, MvccVersion blockerVer) {
        LockFuture fut;
        TxKey key = new TxKey(blockerVer.coordinatorVersion(), blockerVer.counter());
        Waiter waiter = this.waitMap.merge(key, fut = new LockFuture(cctx.ioPolicy(), waiterVer), Waiter::concat);
        if (!waiter.hasLocalTransaction() && (waiter = this.waitMap.remove(key)) != null) {
            waiter.run(this.ctx);
        } else {
            DeadlockDetectionManager.DelayedDeadlockComputation delayedComputation = this.ctx.cache().context().deadlockDetectionMgr().initDelayedComputation(waiterVer, blockerVer);
            if (delayedComputation != null) {
                fut.listen(fut0 -> delayedComputation.cancel());
            }
        }
        return fut;
    }

    @Override
    public void releaseWaiters(MvccVersion locked) {
        Waiter waiter = this.waitMap.remove(new TxKey(locked.coordinatorVersion(), locked.counter()));
        if (waiter != null) {
            waiter.run(this.ctx);
        }
    }

    @Override
    public void addQueryTracker(MvccQueryTracker tracker) {
        assert (tracker.id() != -1L);
        this.activeTrackers.putIfAbsent(tracker.id(), tracker);
    }

    @Override
    public void removeQueryTracker(Long id) {
        this.activeTrackers.remove(id);
    }

    @Override
    public MvccSnapshot requestWriteSnapshotLocal() {
        if (!this.currentCoordinator().local() || !this.initFut.isDone()) {
            return null;
        }
        return this.assignTxSnapshot(0L, this.ctx.localNodeId(), false);
    }

    @Override
    public MvccSnapshot requestReadSnapshotLocal() {
        if (!this.currentCoordinator().local() || !this.initFut.isDone()) {
            return null;
        }
        return this.activeQueries.assignQueryCounter(this.ctx.localNodeId(), 0L);
    }

    @Override
    public IgniteInternalFuture<MvccSnapshot> requestReadSnapshotAsync() {
        MvccSnapshotFuture fut = new MvccSnapshotFuture();
        this.requestReadSnapshotAsync(this.currentCoordinator(), fut);
        return fut;
    }

    @Override
    public IgniteInternalFuture<MvccSnapshot> requestWriteSnapshotAsync() {
        MvccSnapshotFuture fut = new MvccSnapshotFuture();
        this.requestWriteSnapshotAsync(this.currentCoordinator(), fut);
        return fut;
    }

    @Override
    public void requestReadSnapshotAsync(MvccCoordinator crd, MvccSnapshotResponseListener lsnr) {
        this.requestSnapshotAsync(crd, lsnr, true);
    }

    @Override
    public void requestWriteSnapshotAsync(MvccCoordinator crd, MvccSnapshotResponseListener lsnr) {
        this.requestSnapshotAsync(crd, lsnr, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void requestSnapshotAsync(MvccCoordinator crd, final MvccSnapshotResponseListener lsnr, final boolean forRead) {
        if (crd.disconnected()) {
            lsnr.onError(MvccUtils.noCoordinatorError());
            return;
        }
        if (!this.busyLock.enterBusy()) {
            lsnr.onError(new NodeStoppingException("Failed to request snapshot (Node is stopping)."));
            return;
        }
        try {
            Map map0;
            if (this.ctx.localNodeId().equals(crd.nodeId())) {
                if (!this.initFut.isDone()) {
                    this.initFut.listen((IgniteInClosure<IgniteInternalFuture<Void>>)new IgniteInClosure<IgniteInternalFuture>(){

                        @Override
                        public void apply(IgniteInternalFuture fut) {
                            if (forRead) {
                                lsnr.onResponse(MvccProcessorImpl.this.activeQueries.assignQueryCounter(MvccProcessorImpl.this.ctx.localNodeId(), 0L));
                            } else {
                                lsnr.onResponse(MvccProcessorImpl.this.assignTxSnapshot(0L, MvccProcessorImpl.this.ctx.localNodeId(), false));
                            }
                        }
                    });
                } else if (forRead) {
                    lsnr.onResponse(this.activeQueries.assignQueryCounter(this.ctx.localNodeId(), 0L));
                } else {
                    lsnr.onResponse(this.assignTxSnapshot(0L, this.ctx.localNodeId(), false));
                }
                return;
            }
            UUID nodeId = crd.nodeId();
            long id = this.futIdCntr.incrementAndGet();
            Map map = this.snapLsnrs.get(nodeId);
            if (map == null && (map0 = (Map)this.snapLsnrs.putIfAbsent(nodeId, map = new ConcurrentHashMap<Long, MvccSnapshotResponseListener>())) != null) {
                map = map0;
            }
            map.put(id, (MvccSnapshotResponseListener)lsnr);
            try {
                this.sendMessage(nodeId, forRead ? new MvccQuerySnapshotRequest(id) : new MvccTxSnapshotRequest(id));
            }
            catch (IgniteCheckedException e) {
                if (map.remove(id) != null) {
                    lsnr.onError(e);
                }
            }
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    @Override
    public IgniteInternalFuture<Void> ackTxCommit(MvccSnapshot updateVer) {
        assert (updateVer != null);
        MvccCoordinator crd = this.curCrd;
        if (crd.disconnected() || crd.version() != updateVer.coordinatorVersion()) {
            return new GridFinishedFuture<Void>();
        }
        return this.sendTxCommit(crd, new MvccAckRequestTx(this.futIdCntr.incrementAndGet(), updateVer.counter()));
    }

    @Override
    public void ackTxRollback(MvccVersion updateVer) {
        assert (updateVer != null);
        MvccCoordinator crd = this.curCrd;
        if (crd.disconnected() || crd.version() != updateVer.coordinatorVersion()) {
            return;
        }
        MvccAckRequestTx msg = new MvccAckRequestTx(-1L, updateVer.counter());
        msg.skipResponse(true);
        try {
            this.sendMessage(crd.nodeId(), msg);
        }
        catch (ClusterTopologyCheckedException e) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Failed to send tx rollback ack, node left [msg=" + msg + ", node=" + crd.nodeId() + ']');
            }
        }
        catch (IgniteCheckedException e) {
            U.error(this.log, "Failed to send tx rollback ack [msg=" + msg + ", node=" + crd.nodeId() + ']', e);
        }
    }

    @Override
    public void ackQueryDone(MvccSnapshot snapshot, long qryId) {
        MvccCoordinator crd = this.currentCoordinator();
        if (crd.disconnected() || snapshot == null) {
            return;
        }
        if (crd.version() != snapshot.coordinatorVersion() || !this.sendQueryDone(crd, new MvccAckRequestQueryCntr(this.queryTrackCounter(snapshot)))) {
            MvccAckRequestQueryId msg = new MvccAckRequestQueryId(qryId);
            while (!this.sendQueryDone(crd = this.currentCoordinator(), msg)) {
            }
        }
    }

    @Override
    public void dumpDebugInfo(IgniteLogger log, @Nullable IgniteDiagnosticPrepareContext diagCtx) {
        boolean first = true;
        for (Map<Long, MvccSnapshotResponseListener> map : this.snapLsnrs.values()) {
            if (first) {
                U.warn(log, "Pending mvcc listener: ");
                first = false;
            }
            for (MvccSnapshotResponseListener lsnr : map.values()) {
                U.warn(log, ">>> " + lsnr.toString());
            }
        }
        first = true;
        for (WaitAckFuture waitAckFut : this.ackFuts.values()) {
            if (first) {
                U.warn(log, "Pending mvcc wait ack futures: ");
                first = false;
            }
            U.warn(log, ">>> " + waitAckFut.toString());
        }
    }

    void removeUntil(MvccVersion ver) throws IgniteCheckedException {
        this.txLog.removeUntil(ver.coordinatorVersion(), ver.counter());
    }

    private DataRegionConfiguration createTxLogRegion(DataStorageConfiguration dscfg) {
        DataRegionConfiguration cfg = new DataRegionConfiguration();
        cfg.setName("TxLog");
        cfg.setInitialSize(dscfg.getSystemDataRegionConfiguration().getInitialSize());
        cfg.setMaxSize(dscfg.getSystemDataRegionConfiguration().getMaxSize());
        cfg.setPersistenceEnabled(CU.isPersistenceEnabled(dscfg));
        cfg.setLazyMemoryAllocation(false);
        return cfg;
    }

    private DataStorageConfiguration dataStorageConfiguration() {
        return this.ctx.config().getDataStorageConfiguration();
    }

    @NotNull
    private MvccCoordinator pickMvccCoordinator(Collection<ClusterNode> nodes, AffinityTopologyVersion topVer) {
        MvccCoordinator crd;
        ClusterNode crdNode;
        block5: {
            block4: {
                crdNode = null;
                if (crdC == null) break block4;
                crdNode = crdC.apply(nodes);
                if (crdNode == null || !this.log.isInfoEnabled()) break block5;
                this.log.info("Assigned coordinator using test closure: " + crdNode.id());
                break block5;
            }
            for (ClusterNode node : nodes) {
                if (node.isClient() || !this.supportsMvcc(node)) continue;
                crdNode = node;
                break;
            }
        }
        MvccCoordinator mvccCoordinator = crd = crdNode != null ? new MvccCoordinator(topVer, crdNode.id(), this.coordinatorVersion(crdNode), crdNode.isLocal()) : MvccCoordinator.DISCONNECTED_COORDINATOR;
        if (crd.disconnected()) {
            U.warn(this.log, "New mvcc coordinator was not assigned [topVer=" + topVer + ']');
        } else if (this.log.isInfoEnabled()) {
            this.log.info("Assigned mvcc coordinator [crd=" + crd + ']');
        }
        return crd;
    }

    private long coordinatorVersion(ClusterNode crdNode) {
        return crdNode.order() + this.ctx.discovery().gridStartTime();
    }

    private void checkMvccSupported(Collection<ClusterNode> nodes) {
        if (this.mvccEnabled) {
            assert (this.mvccSupported);
            return;
        }
        boolean res = true;
        boolean was = this.mvccSupported;
        for (ClusterNode node : nodes) {
            if (this.supportsMvcc(node)) continue;
            res = false;
            break;
        }
        if (was != res) {
            this.mvccSupported = res;
        }
    }

    private boolean supportsMvcc(ClusterNode node) {
        return node.version().compareToIgnoreTimestamp(MVCC_SUPPORTED_SINCE) >= 0;
    }

    private void checkMvccCacheStarted(DynamicCacheChangeBatch cacheMsg) {
        if (!this.mvccEnabled) {
            for (DynamicCacheChangeRequest req : cacheMsg.requests()) {
                CacheConfiguration ccfg = req.startCacheConfiguration();
                if (ccfg == null || ccfg.getAtomicityMode() != CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT) continue;
                assert (this.mvccSupported);
                this.mvccEnabled = true;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private MvccSnapshotResponse assignTxSnapshot(long futId, UUID nearId, boolean client) {
        long cleanup;
        long tracking;
        long ver;
        assert (this.initFut.isDone() && this.curCrd.local());
        MvccSnapshotResponse res = new MvccSnapshotResponse();
        MvccProcessorImpl mvccProcessorImpl = this;
        synchronized (mvccProcessorImpl) {
            boolean add;
            tracking = ver = this.mvccCntr.incrementAndGet();
            cleanup = this.committedCntr.get() + 1L;
            for (Map.Entry<Long, ActiveTx> entry : this.activeTxs.entrySet()) {
                cleanup = Math.min(entry.getValue().tracking, cleanup);
                tracking = Math.min(entry.getKey(), tracking);
                res.addTx(entry.getKey());
            }
            ActiveTx activeTx = client ? new ActiveTx(tracking, nearId) : new ActiveServerTx(tracking, nearId);
            boolean bl = add = this.activeTxs.put(ver, activeTx) == null;
            assert (add) : ver;
        }
        long minQry = this.activeQueries.minimalQueryCounter();
        if (minQry != -1L) {
            cleanup = Math.min(cleanup, minQry);
        }
        cleanup = this.prevQueries.done() ? cleanup - 1L : 0L;
        res.init(futId, this.curCrd.version(), ver, 1, cleanup, tracking);
        return res;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void onTxDone(Long txCntr, boolean increaseCommittedCntr) {
        assert (this.initFut.isDone());
        MvccProcessorImpl mvccProcessorImpl = this;
        synchronized (mvccProcessorImpl) {
            this.activeTxs.remove(txCntr);
            if (increaseCommittedCntr) {
                this.committedCntr.setIfGreater(txCntr);
            }
        }
    }

    private void onQueryDone(UUID nodeId, Long mvccCntr) {
        this.activeQueries.onQueryDone(nodeId, mvccCntr);
    }

    private long queryTrackCounter(MvccSnapshot mvccVer) {
        long trackCntr = mvccVer.counter();
        MvccLongList txs = mvccVer.activeTransactions();
        int size = txs.size();
        for (int i = 0; i < size; ++i) {
            long txVer = txs.get(i);
            if (txVer >= trackCntr) continue;
            trackCntr = txVer;
        }
        return trackCntr;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void startVacuumWorkers() {
        assert (!this.ctx.clientNode());
        Object object = this.mux;
        synchronized (object) {
            if (this.vacuumWorkers == null) {
                assert (this.cleanupQueue == null);
                this.cleanupQueue = new LinkedBlockingQueue<VacuumTask>();
                this.vacuumWorkers = new ArrayList<GridWorker>(this.ctx.config().getMvccVacuumThreadCount() + 1);
                this.vacuumWorkers.add(new VacuumScheduler(this.ctx, this.log, this));
                for (int i = 0; i < this.ctx.config().getMvccVacuumThreadCount(); ++i) {
                    this.vacuumWorkers.add(new VacuumWorker(this.ctx, this.log, this.cleanupQueue));
                }
                for (GridWorker worker : this.vacuumWorkers) {
                    new IgniteThread(worker).start();
                }
                return;
            }
        }
        U.warn(this.log, "Attempting to start active vacuum.");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void stopVacuumWorkers() {
        if (!this.ctx.clientNode()) {
            BlockingQueue<VacuumTask> queue;
            List<GridWorker> workers;
            Object object = this.mux;
            synchronized (object) {
                workers = this.vacuumWorkers;
                queue = this.cleanupQueue;
                this.vacuumWorkers = null;
                this.cleanupQueue = null;
            }
            if (workers == null) {
                if (this.log.isDebugEnabled() && this.mvccEnabled()) {
                    this.log.debug("Attempting to stop inactive vacuum.");
                }
                return;
            }
            assert (queue != null);
            U.cancel(workers);
            U.join(workers, this.log);
            if (!queue.isEmpty()) {
                IgniteCheckedException ex = this.vacuumCancelledException();
                for (VacuumTask task : queue) {
                    task.onDone(ex);
                }
            }
        }
    }

    IgniteInternalFuture<VacuumMetrics> runVacuum() {
        assert (!this.ctx.clientNode());
        MvccCoordinator crd0 = this.currentCoordinator();
        if (!crd0.initialized() || Thread.currentThread().isInterrupted()) {
            return new GridFinishedFuture<VacuumMetrics>(new VacuumMetrics());
        }
        final GridFutureAdapter<VacuumMetrics> res = new GridFutureAdapter<VacuumMetrics>();
        MvccSnapshot snapshot = this.requestWriteSnapshotLocal();
        if (snapshot != null) {
            this.continueRunVacuum(res, snapshot);
        } else {
            this.requestWriteSnapshotAsync(crd0, new MvccSnapshotResponseListener(){

                @Override
                public void onResponse(MvccSnapshot s2) {
                    MvccProcessorImpl.this.continueRunVacuum(res, s2);
                }

                @Override
                public void onError(IgniteCheckedException e) {
                    if (!(e instanceof ClusterTopologyCheckedException)) {
                        MvccProcessorImpl.this.completeWithException(res, e);
                    } else {
                        if (MvccProcessorImpl.this.log.isDebugEnabled()) {
                            MvccProcessorImpl.this.log.debug("Vacuum failed to receive an Mvcc snapshot. Need to retry on the stable topology. " + e.getMessage());
                        }
                        res.onDone(new VacuumMetrics());
                    }
                }
            });
        }
        return res;
    }

    private void continueRunVacuum(final GridFutureAdapter<VacuumMetrics> res, final MvccSnapshot snapshot) {
        this.ackTxCommit(snapshot).listen((IgniteInClosure<IgniteInternalFuture<Void>>)new IgniteInClosure<IgniteInternalFuture>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void apply(IgniteInternalFuture fut) {
                Throwable err = fut.error();
                if (err != null) {
                    U.error(MvccProcessorImpl.this.log, "Vacuum error.", err);
                    res.onDone(err);
                } else if (snapshot.cleanupVersion() <= 0L) {
                    res.onDone(new VacuumMetrics());
                } else {
                    try {
                        if (MvccProcessorImpl.this.log.isDebugEnabled()) {
                            MvccProcessorImpl.this.log.debug("Started vacuum with cleanup version=" + snapshot.cleanupVersion() + '.');
                        }
                        Object object = MvccProcessorImpl.this.mux;
                        synchronized (object) {
                            if (MvccProcessorImpl.this.cleanupQueue == null) {
                                res.onDone(MvccProcessorImpl.this.vacuumCancelledException());
                                return;
                            }
                            GridCompoundIdentityFuture<VacuumMetrics> res0 = new GridCompoundIdentityFuture<VacuumMetrics>((IgniteReducer)new VacuumMetricsReducer()){

                                @Override
                                protected void logError(IgniteLogger log, String msg, Throwable e) {
                                }

                                @Override
                                protected void logDebug(IgniteLogger log, String msg) {
                                }
                            };
                            for (CacheGroupContext grp : MvccProcessorImpl.this.ctx.cache().cacheGroups()) {
                                if (!grp.mvccEnabled()) continue;
                                grp.topology().readLock();
                                try {
                                    for (GridDhtLocalPartition part : grp.topology().localPartitions()) {
                                        VacuumTask task = new VacuumTask(snapshot, part);
                                        MvccProcessorImpl.this.cleanupQueue.offer(task);
                                        res0.add(task);
                                    }
                                }
                                finally {
                                    grp.topology().readUnlock();
                                }
                            }
                            res0.markInitialized();
                            res0.listen(future -> {
                                VacuumMetrics metrics = null;
                                GridClosureException ex = null;
                                try {
                                    metrics = (VacuumMetrics)future.get();
                                    MvccProcessorImpl.this.txLog.removeUntil(snapshot.coordinatorVersion(), snapshot.cleanupVersion());
                                    if (MvccProcessorImpl.this.log.isDebugEnabled()) {
                                        MvccProcessorImpl.this.log.debug("Vacuum completed. " + metrics);
                                    }
                                }
                                catch (Throwable e) {
                                    if (X.hasCause(e, NodeStoppingException.class)) {
                                        if (MvccProcessorImpl.this.log.isDebugEnabled()) {
                                            MvccProcessorImpl.this.log.debug("Cannot complete vacuum (node is stopping).");
                                        }
                                        metrics = new VacuumMetrics();
                                    }
                                    ex = new GridClosureException(e);
                                }
                                res.onDone(metrics, ex);
                            });
                        }
                    }
                    catch (Throwable e) {
                        MvccProcessorImpl.this.completeWithException(res, e);
                    }
                }
            }
        });
    }

    private void completeWithException(GridFutureAdapter fut, Throwable e) {
        fut.onDone(e);
        if (e instanceof Error) {
            throw (Error)e;
        }
    }

    @NotNull
    private IgniteCheckedException vacuumCancelledException() {
        return new NodeStoppingException("Operation has been cancelled (node is stopping).");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @NotNull
    private IgniteInternalFuture<Void> sendTxCommit(MvccCoordinator crd, MvccAckRequestTx msg) {
        if (!this.busyLock.enterBusy()) {
            return new GridFinishedFuture<Void>();
        }
        WaitAckFuture fut = null;
        try {
            fut = new WaitAckFuture(msg.futureId(), crd.nodeId(), true);
            this.ackFuts.put(fut.id, fut);
            this.sendMessage(crd.nodeId(), msg);
        }
        catch (IgniteCheckedException e) {
            if (fut != null && this.ackFuts.remove(fut.id) != null) {
                if (e instanceof ClusterTopologyCheckedException) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Failed to send tx ack, node left [crd=" + crd + ", msg=" + msg + ']');
                    }
                    fut.onDone();
                } else {
                    fut.onDone(e);
                }
            }
        }
        finally {
            this.busyLock.leaveBusy();
        }
        return fut != null ? fut : new GridFinishedFuture();
    }

    private boolean sendQueryDone(MvccCoordinator crd, Message msg) {
        if (crd.disconnected()) {
            return true;
        }
        try {
            this.sendMessage(crd.nodeId(), msg);
            return true;
        }
        catch (ClusterTopologyCheckedException e) {
            MvccCoordinator crd0;
            if (this.log.isDebugEnabled()) {
                this.log.debug("Failed to send query ack, node left [crd=" + crd + ", msg=" + msg + ']');
            }
            return (crd0 = this.currentCoordinator()).disconnected() || crd.version() == crd0.version();
        }
        catch (IgniteCheckedException e) {
            U.error(this.log, "Failed to send query ack [crd=" + crd + ", msg=" + msg + ']', e);
            return true;
        }
    }

    private void sendMessage(UUID nodeId, Message msg) throws IgniteCheckedException {
        this.ctx.io().sendToGridTopic(nodeId, GridTopic.TOPIC_CACHE_COORDINATOR, msg, (byte)2);
    }

    private void processCoordinatorTxSnapshotRequest(UUID nodeId, MvccTxSnapshotRequest msg) {
        ClusterNode node = this.ctx.discovery().node(nodeId);
        if (node == null) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Ignore tx snapshot request processing, node left [msg=" + msg + ", node=" + nodeId + ']');
            }
            return;
        }
        MvccSnapshotResponse res = this.assignTxSnapshot(msg.futureId(), nodeId, node.isClient());
        boolean finishFailed = true;
        try {
            this.sendMessage(node.id(), res);
            finishFailed = false;
        }
        catch (ClusterTopologyCheckedException e) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Failed to send tx snapshot response, node left [msg=" + msg + ", node=" + nodeId + ']');
            }
        }
        catch (IgniteCheckedException e) {
            U.error(this.log, "Failed to send tx snapshot response [msg=" + msg + ", node=" + nodeId + ']', e);
        }
        if (finishFailed) {
            this.onTxDone(res.counter(), false);
        }
    }

    private void processCoordinatorQuerySnapshotRequest(UUID nodeId, MvccQuerySnapshotRequest msg) {
        ClusterNode node = this.ctx.discovery().node(nodeId);
        if (node == null) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Ignore query counter request processing, node left [msg=" + msg + ", node=" + nodeId + ']');
            }
            return;
        }
        MvccSnapshotResponse res = this.activeQueries.assignQueryCounter(nodeId, msg.futureId());
        try {
            this.sendMessage(node.id(), res);
        }
        catch (ClusterTopologyCheckedException e) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Failed to send query counter response, node left [msg=" + msg + ", node=" + nodeId + ']');
            }
        }
        catch (IgniteCheckedException e) {
            this.onQueryDone(nodeId, res.tracking());
            U.error(this.log, "Failed to send query counter response [msg=" + msg + ", node=" + nodeId + ']', e);
        }
    }

    private void processCoordinatorSnapshotResponse(UUID nodeId, MvccSnapshotResponse msg) {
        MvccSnapshotResponseListener lsnr;
        Map<Long, MvccSnapshotResponseListener> map = this.snapLsnrs.get(nodeId);
        if (map != null && (lsnr = map.remove(msg.futureId())) != null) {
            lsnr.onResponse(msg);
        } else if (this.ctx.discovery().alive(nodeId)) {
            U.warn(this.log, "Failed to find query version future [node=" + nodeId + ", msg=" + msg + ']');
        } else if (this.log.isDebugEnabled()) {
            this.log.debug("Failed to find query version future [node=" + nodeId + ", msg=" + msg + ']');
        }
    }

    private void processCoordinatorQueryAckRequest(UUID nodeId, MvccAckRequestQueryCntr msg) {
        this.onQueryDone(nodeId, msg.counter());
    }

    private void processNewCoordinatorQueryAckRequest(UUID nodeId, MvccAckRequestQueryId msg) {
        this.prevQueries.onQueryDone(nodeId, msg.queryTrackerId());
    }

    private void processCoordinatorTxAckRequest(UUID nodeId, MvccAckRequestTx msg) {
        this.onTxDone(msg.txCounter(), msg.futureId() >= 0L);
        if (msg.queryCounter() != 0L) {
            this.onQueryDone(nodeId, msg.queryCounter());
        } else if (msg.queryTrackerId() != -1L) {
            this.prevQueries.onQueryDone(nodeId, msg.queryTrackerId());
        }
        if (!msg.skipResponse()) {
            try {
                this.sendMessage(nodeId, new MvccFutureResponse(msg.futureId()));
            }
            catch (ClusterTopologyCheckedException e) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Failed to send tx ack response, node left [msg=" + msg + ", node=" + nodeId + ']');
                }
            }
            catch (IgniteCheckedException e) {
                U.error(this.log, "Failed to send tx ack response [msg=" + msg + ", node=" + nodeId + ']', e);
            }
        }
    }

    private void processCoordinatorAckResponse(UUID nodeId, MvccFutureResponse msg) {
        WaitAckFuture fut = this.ackFuts.remove(msg.futureId());
        if (fut != null) {
            fut.onResponse();
        } else if (this.ctx.discovery().alive(nodeId)) {
            U.warn(this.log, "Failed to find tx ack future [node=" + nodeId + ", msg=" + msg + ']');
        } else if (this.log.isDebugEnabled()) {
            this.log.debug("Failed to find tx ack future [node=" + nodeId + ", msg=" + msg + ']');
        }
    }

    private void processActiveQueriesMessage(UUID nodeId, MvccActiveQueriesMessage msg) {
        GridLongList queryIds = msg.activeQueries();
        assert (queryIds != null);
        this.prevQueries.addActiveQueries(nodeId, queryIds);
    }

    @Override
    public Optional<? extends MvccVersion> checkWaiting(MvccVersion mvccVer) {
        return this.waitMap.entrySet().stream().filter(e -> ((Waiter)e.getValue()).lockFuture(mvccVer) != null).map(Map.Entry::getKey).map(key -> new MvccVersionImpl(key.major(), key.minor(), 0)).findAny();
    }

    @Override
    public void failWaiter(MvccVersion mvccVer, Exception e) {
        this.waitMap.values().stream().map(w -> w.lockFuture(mvccVer)).filter(Objects::nonNull).findAny().ifPresent(w -> w.onDone(e));
    }

    private void processRecoveryFinishedMessage(UUID nodeId, MvccRecoveryFinishedMessage msg) {
        if (!this.mvccEnabled) {
            return;
        }
        UUID nearNodeId = msg.nearNodeId();
        RecoveryBallotBox ballotBox = this.recoveryBallotBoxes.computeIfAbsent(nearNodeId, uuid -> new RecoveryBallotBox());
        ballotBox.vote(nodeId);
        this.tryFinishRecoveryVoting(nearNodeId, ballotBox);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void tryFinishRecoveryVoting(UUID nearNodeId, RecoveryBallotBox ballotBox) {
        if (ballotBox.isVotingDone()) {
            List<Long> recoveredTxs;
            MvccProcessorImpl mvccProcessorImpl = this;
            synchronized (mvccProcessorImpl) {
                recoveredTxs = this.activeTxs.entrySet().stream().filter(e -> ((ActiveTx)e.getValue()).nearNodeId.equals(nearNodeId)).map(Map.Entry::getKey).collect(Collectors.toList());
            }
            recoveredTxs.forEach(txCntr -> this.onTxDone((Long)txCntr, true));
            this.recoveryBallotBoxes.remove(nearNodeId);
        }
    }

    private static class ActiveServerTx
    extends ActiveTx {
        private ActiveServerTx(long tracking, UUID nearNodeId) {
            super(tracking, nearNodeId);
        }
    }

    private static class ActiveTx {
        private final long tracking;
        private final UUID nearNodeId;

        private ActiveTx(long tracking, UUID nearNodeId) {
            this.tracking = tracking;
            this.nearNodeId = nearNodeId;
        }
    }

    private static class VacuumWorker
    extends GridWorker {
        private final BlockingQueue<VacuumTask> cleanupQueue;

        VacuumWorker(GridKernalContext ctx, IgniteLogger log, BlockingQueue<VacuumTask> cleanupQueue) {
            super(ctx.igniteInstanceName(), "vacuum-cleaner", log);
            this.cleanupQueue = cleanupQueue;
        }

        @Override
        protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
            while (!this.isCancelled()) {
                VacuumTask task = this.cleanupQueue.take();
                try {
                    switch (task.part().state()) {
                        case EVICTED: 
                        case RENTING: {
                            task.onDone(new VacuumMetrics());
                            break;
                        }
                        case MOVING: {
                            task.part().group().preloader().rebalanceFuture().listen(f -> this.cleanupQueue.add(task));
                            break;
                        }
                        case OWNING: {
                            task.onDone(this.processPartition(task));
                            break;
                        }
                        case LOST: {
                            task.onDone(new IgniteCheckedException("Partition is lost."));
                        }
                    }
                }
                catch (IgniteInterruptedCheckedException e) {
                    task.onDone(e);
                    throw e;
                }
                catch (Throwable e) {
                    task.onDone(e);
                    if (X.hasCause(e, NodeStoppingException.class)) {
                        return;
                    }
                    if (!(e instanceof Error)) continue;
                    throw (Error)e;
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private VacuumMetrics processPartition(VacuumTask task) throws IgniteCheckedException {
            long startNanoTime = System.nanoTime();
            GridDhtLocalPartition part = task.part();
            VacuumMetrics metrics = new VacuumMetrics();
            if (!part.reserve()) {
                return metrics;
            }
            int curCacheId = 0;
            try {
                KeyCacheObject prevKey = null;
                Object rest = null;
                List<MvccLinkAwareSearchRow> cleanupRows = null;
                MvccSnapshot snapshot = task.snapshot();
                GridCacheContext<?, ?> cctx = null;
                boolean shared = part.group().sharedGroup();
                if (!shared && (cctx = F.first(part.group().caches())) == null) {
                    VacuumMetrics vacuumMetrics = metrics;
                    return vacuumMetrics;
                }
                GridCursor<? extends CacheDataRow> cursor = part.dataStore().cursor((Object)CacheDataRowAdapter.RowData.KEY_ONLY);
                while (cursor.next()) {
                    if (this.isCancelled()) {
                        throw new IgniteInterruptedCheckedException("Operation has been cancelled.");
                    }
                    MvccDataRow row = (MvccDataRow)cursor.get();
                    if (prevKey == null) {
                        prevKey = row.key();
                    }
                    if (cctx == null) {
                        curCacheId = row.cacheId();
                        cctx = part.group().shared().cacheContext(curCacheId);
                        if (cctx == null) continue;
                    }
                    if (shared && curCacheId != row.cacheId() || !prevKey.equals(row.key())) {
                        if (rest != null || !F.isEmpty(cleanupRows)) {
                            this.cleanup(part, prevKey, cleanupRows, rest, cctx, metrics);
                        }
                        cleanupRows = null;
                        rest = null;
                        if (shared && curCacheId != row.cacheId()) {
                            curCacheId = row.cacheId();
                            cctx = part.group().shared().cacheContext(curCacheId);
                            if (cctx == null) continue;
                        }
                        prevKey = row.key();
                    }
                    if (this.canClean(row, snapshot, cctx)) {
                        cleanupRows = this.addRow(cleanupRows, row);
                    } else if (this.actualize(cctx, row, snapshot)) {
                        rest = this.addRest(rest, row);
                    }
                    metrics.addScannedRowsCount(1L);
                }
                if (rest != null || !F.isEmpty(cleanupRows)) {
                    this.cleanup(part, prevKey, cleanupRows, rest, cctx, metrics);
                }
                metrics.addSearchNanoTime(System.nanoTime() - startNanoTime - metrics.cleanupNanoTime());
                VacuumMetrics vacuumMetrics = metrics;
                return vacuumMetrics;
            }
            finally {
                part.release();
            }
        }

        @NotNull
        private Object addRest(@Nullable Object rest, MvccDataRow row) {
            if (rest == null) {
                rest = row;
            } else if (rest.getClass() == ArrayList.class) {
                ((List)rest).add(row);
            } else {
                ArrayList<Object> list = new ArrayList<Object>();
                list.add(rest);
                list.add(row);
                rest = list;
            }
            return rest;
        }

        @NotNull
        private List<MvccLinkAwareSearchRow> addRow(@Nullable List<MvccLinkAwareSearchRow> rows, MvccDataRow row) {
            if (rows == null) {
                rows = new ArrayList<MvccLinkAwareSearchRow>();
            }
            rows.add(new MvccLinkAwareSearchRow(row.cacheId(), row.key(), row.mvccCoordinatorVersion(), row.mvccCounter(), row.mvccOperationCounter(), row.link()));
            return rows;
        }

        private boolean canClean(MvccDataRow row, MvccSnapshot snapshot, GridCacheContext cctx) {
            return MvccUtils.compare(row, snapshot.coordinatorVersion(), snapshot.cleanupVersion()) <= 0 && MvccUtils.hasNewVersion(row) && MvccUtils.compareNewVersion(row, snapshot.coordinatorVersion(), snapshot.cleanupVersion()) <= 0 && MvccUtils.state(cctx, row.newMvccCoordinatorVersion(), row.newMvccCounter(), row.newMvccOperationCounter() | row.newMvccTxState() << 30) == 3 || MvccUtils.state(cctx, row.mvccCoordinatorVersion(), row.mvccCounter(), row.mvccOperationCounter() | row.mvccTxState() << 30) == 2;
        }

        private boolean actualize(GridCacheContext cctx, MvccDataRow row, MvccSnapshot snapshot) throws IgniteCheckedException {
            return MvccUtils.isVisible(cctx, snapshot, row.mvccCoordinatorVersion(), row.mvccCounter(), row.mvccOperationCounter(), false) && (row.mvccTxState() == 0 || row.newMvccCoordinatorVersion() != 0L && row.newMvccTxState() == 0);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void cleanup(GridDhtLocalPartition part, KeyCacheObject key, List<MvccLinkAwareSearchRow> cleanupRows, Object rest, GridCacheContext cctx, VacuumMetrics metrics) throws IgniteCheckedException {
            assert (!(key == null || cctx == null || F.isEmpty(cleanupRows) && rest == null));
            cctx.gate().enter();
            try {
                long cleanupStartNanoTime = System.nanoTime();
                GridCacheEntryEx entry = cctx.cache().entryEx(key);
                while (true) {
                    entry.lockEntry();
                    if (!entry.obsolete()) break;
                    entry.unlockEntry();
                    entry = cctx.cache().entryEx(key);
                }
                int cleaned = 0;
                try {
                    cctx.shared().database().checkpointReadLock();
                    try {
                        if (cleanupRows != null) {
                            cleaned = part.dataStore().cleanup(cctx, cleanupRows);
                        }
                        if (rest != null) {
                            if (rest.getClass() == ArrayList.class) {
                                for (MvccDataRow row : (List)rest) {
                                    part.dataStore().updateTxState(cctx, row);
                                }
                            } else {
                                part.dataStore().updateTxState(cctx, (MvccDataRow)rest);
                            }
                        }
                    }
                    finally {
                        cctx.shared().database().checkpointReadUnlock();
                    }
                }
                finally {
                    entry.unlockEntry();
                    cctx.evicts().touch(entry);
                    metrics.addCleanupNanoTime(System.nanoTime() - cleanupStartNanoTime);
                    metrics.addCleanupRowsCnt(cleaned);
                }
            }
            finally {
                cctx.gate().leave();
            }
        }
    }

    private static class VacuumScheduler
    extends GridWorker {
        private static final long VACUUM_TIMEOUT = 60000L;
        private final long interval;
        private final MvccProcessorImpl prc;

        VacuumScheduler(GridKernalContext ctx, IgniteLogger log, MvccProcessorImpl prc) {
            super(ctx.igniteInstanceName(), "vacuum-scheduler", log);
            this.interval = ctx.config().getMvccVacuumFrequency();
            this.prc = prc;
        }

        @Override
        protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
            U.sleep(this.interval);
            while (!this.isCancelled()) {
                long delay;
                long nextScheduledTime;
                block9: {
                    nextScheduledTime = U.currentTimeMillis() + this.interval;
                    try {
                        IgniteInternalFuture<VacuumMetrics> fut = this.prc.runVacuum();
                        if (this.log.isDebugEnabled()) {
                            this.log.debug("Vacuum started by scheduler.");
                        }
                        while (true) {
                            try {
                                fut.get(60000L);
                            }
                            catch (IgniteFutureTimeoutCheckedException e) {
                                U.warn(this.log, "Failed to wait for vacuum complete. Consider increasing vacuum workers count.");
                                continue;
                            }
                            break;
                        }
                    }
                    catch (IgniteInterruptedCheckedException e) {
                        throw e;
                    }
                    catch (Throwable e) {
                        if (e instanceof Error) {
                            throw (Error)e;
                        }
                        if (!this.log.isDebugEnabled()) break block9;
                        U.warn(this.log, "Failed to perform vacuum.", e);
                    }
                }
                if ((delay = nextScheduledTime - U.currentTimeMillis()) <= 0L) continue;
                U.sleep(delay);
            }
        }
    }

    private static class CompoundWaiterNoLocal
    extends CompoundWaiter {
        private CompoundWaiterNoLocal(Waiter first, Waiter second) {
            super(first, second);
        }

        @Override
        public Waiter concat(Waiter other) {
            return new CompoundWaiterNoLocal((Waiter)this, other);
        }

        @Override
        public boolean hasLocalTransaction() {
            return false;
        }
    }

    private static class CompoundWaiter
    implements Waiter {
        private final Object inner;

        private CompoundWaiter(Waiter waiter) {
            this.inner = waiter.compound() ? ((CompoundWaiter)waiter).inner : waiter;
        }

        private CompoundWaiter(Waiter first, Waiter second) {
            ArrayList<Waiter> list = new ArrayList<Waiter>();
            this.add(list, first);
            this.add(list, second);
            this.inner = list;
        }

        private void add(List<Waiter> to, Waiter waiter) {
            if (!waiter.compound()) {
                to.add(waiter);
            } else if (((CompoundWaiter)waiter).inner.getClass() == ArrayList.class) {
                to.addAll((List)((CompoundWaiter)waiter).inner);
            } else {
                to.add((Waiter)((CompoundWaiter)waiter).inner);
            }
        }

        @Override
        public void run(GridKernalContext ctx) {
            if (this.inner.getClass() == ArrayList.class) {
                for (Waiter waiter : (List)this.inner) {
                    waiter.run(ctx);
                }
            } else {
                ((Waiter)this.inner).run(ctx);
            }
        }

        @Override
        public Waiter concat(Waiter other) {
            return new CompoundWaiter((Waiter)this, other);
        }

        @Override
        public boolean hasLocalTransaction() {
            return true;
        }

        @Override
        public boolean compound() {
            return true;
        }

        @Override
        public GridFutureAdapter<?> lockFuture(MvccVersion checkedVer) {
            if (this.inner.getClass() == ArrayList.class) {
                for (Waiter waiter : (List)this.inner) {
                    GridFutureAdapter<?> waitFut = waiter.lockFuture(checkedVer);
                    if (waitFut == null) continue;
                    return waitFut;
                }
                return null;
            }
            return ((Waiter)this.inner).lockFuture(checkedVer);
        }
    }

    private static class LocalTransactionMarker
    implements Waiter {
        private LocalTransactionMarker() {
        }

        @Override
        public void run(GridKernalContext ctx) {
        }

        @Override
        public Waiter concat(Waiter other) {
            return new CompoundWaiter(other);
        }

        @Override
        public boolean hasLocalTransaction() {
            return true;
        }

        @Override
        public boolean compound() {
            return false;
        }

        @Override
        public GridFutureAdapter<?> lockFuture(MvccVersion checkedVer) {
            return null;
        }
    }

    private static class LockFuture
    extends GridFutureAdapter<Void>
    implements Waiter,
    Runnable {
        private final byte plc;
        private final MvccVersion waitingTxVer;

        LockFuture(byte plc, MvccVersion waitingTxVer) {
            this.plc = plc;
            this.waitingTxVer = waitingTxVer;
        }

        @Override
        public void run() {
            this.onDone();
        }

        @Override
        public void run(GridKernalContext ctx) {
            try {
                if (!this.isDone()) {
                    ctx.pools().poolForPolicy(this.plc).execute(this);
                }
            }
            catch (IgniteCheckedException e) {
                U.error(ctx.log(LockFuture.class), e);
            }
        }

        @Override
        public Waiter concat(Waiter other) {
            return new CompoundWaiterNoLocal(this, other);
        }

        @Override
        public boolean hasLocalTransaction() {
            return false;
        }

        @Override
        public boolean compound() {
            return false;
        }

        @Override
        public GridFutureAdapter<?> lockFuture(MvccVersion checkedVer) {
            return MvccUtils.belongToSameTx(this.waitingTxVer, checkedVer) ? this : null;
        }
    }

    private static interface Waiter {
        public void run(GridKernalContext var1);

        public Waiter concat(Waiter var1);

        public boolean hasLocalTransaction();

        public boolean compound();

        @Nullable
        public GridFutureAdapter<?> lockFuture(MvccVersion var1);
    }

    private static class RecoveryBallotBox {
        private List<UUID> voters;
        private final Set<UUID> ballots = new HashSet<UUID>();

        private RecoveryBallotBox() {
        }

        private synchronized void voters(List<UUID> voters) {
            this.voters = voters;
        }

        private synchronized void vote(UUID nodeId) {
            this.ballots.add(nodeId);
        }

        private synchronized boolean isVotingDone() {
            if (this.voters == null) {
                return false;
            }
            return this.ballots.containsAll(this.voters);
        }
    }

    private class MvccMessageListener
    implements GridMessageListener {
        private MvccMessageListener() {
        }

        @Override
        public void onMessage(final UUID nodeId, final Object msg, byte plc) {
            MvccMessage msg0 = (MvccMessage)msg;
            if (msg0.waitForCoordinatorInit() && !MvccProcessorImpl.this.initFut.isDone()) {
                MvccProcessorImpl.this.initFut.listen(new IgniteInClosure<IgniteInternalFuture<Void>>(){

                    @Override
                    public void apply(IgniteInternalFuture<Void> fut) {
                        assert (MvccProcessorImpl.this.curCrd.local());
                        MvccMessageListener.this.processMessage(nodeId, msg);
                    }
                });
            } else {
                this.processMessage(nodeId, msg);
            }
        }

        private void processMessage(UUID nodeId, Object msg) {
            if (msg instanceof MvccTxSnapshotRequest) {
                MvccProcessorImpl.this.processCoordinatorTxSnapshotRequest(nodeId, (MvccTxSnapshotRequest)msg);
            } else if (msg instanceof MvccAckRequestTx) {
                MvccProcessorImpl.this.processCoordinatorTxAckRequest(nodeId, (MvccAckRequestTx)msg);
            } else if (msg instanceof MvccFutureResponse) {
                MvccProcessorImpl.this.processCoordinatorAckResponse(nodeId, (MvccFutureResponse)msg);
            } else if (msg instanceof MvccAckRequestQueryCntr) {
                MvccProcessorImpl.this.processCoordinatorQueryAckRequest(nodeId, (MvccAckRequestQueryCntr)msg);
            } else if (msg instanceof MvccQuerySnapshotRequest) {
                MvccProcessorImpl.this.processCoordinatorQuerySnapshotRequest(nodeId, (MvccQuerySnapshotRequest)msg);
            } else if (msg instanceof MvccSnapshotResponse) {
                MvccProcessorImpl.this.processCoordinatorSnapshotResponse(nodeId, (MvccSnapshotResponse)msg);
            } else if (msg instanceof MvccAckRequestQueryId) {
                MvccProcessorImpl.this.processNewCoordinatorQueryAckRequest(nodeId, (MvccAckRequestQueryId)msg);
            } else if (msg instanceof MvccActiveQueriesMessage) {
                MvccProcessorImpl.this.processActiveQueriesMessage(nodeId, (MvccActiveQueriesMessage)msg);
            } else if (msg instanceof MvccRecoveryFinishedMessage) {
                MvccProcessorImpl.this.processRecoveryFinishedMessage(nodeId, (MvccRecoveryFinishedMessage)msg);
            } else {
                U.warn(MvccProcessorImpl.this.log, "Unexpected message received [node=" + nodeId + ", msg=" + msg + ']');
            }
        }

        public String toString() {
            return "MvccMessageListener[]";
        }
    }

    private class WaitAckFuture
    extends MvccFuture<Void> {
        private final long id;
        final boolean ackTx;

        WaitAckFuture(long id, UUID nodeId, boolean ackTx) {
            super(nodeId);
            this.id = id;
            this.ackTx = ackTx;
        }

        void onResponse() {
            this.onDone();
        }

        void onNodeLeft(UUID nodeId) {
            if (this.crdId.equals(nodeId) && MvccProcessorImpl.this.ackFuts.remove(this.id) != null) {
                this.onDone();
            }
        }

        @Override
        public String toString() {
            return S.toString(WaitAckFuture.class, this, super.toString());
        }
    }

    private class ActiveQueries {
        private final Map<UUID, TreeMap<Long, AtomicInteger>> activeQueries = new HashMap<UUID, TreeMap<Long, AtomicInteger>>();
        private Long minQry;

        private ActiveQueries() {
        }

        private synchronized long minimalQueryCounter() {
            return this.minQry == null ? -1L : this.minQry;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private synchronized MvccSnapshotResponse assignQueryCounter(UUID nodeId, long futId) {
            long tracking;
            long ver;
            MvccSnapshotResponse res = new MvccSnapshotResponse();
            MvccProcessorImpl mvccProcessorImpl = MvccProcessorImpl.this;
            synchronized (mvccProcessorImpl) {
                tracking = ver = MvccProcessorImpl.this.committedCntr.get();
                for (Long txVer : MvccProcessorImpl.this.activeTxs.keySet()) {
                    if (txVer >= ver) continue;
                    tracking = Math.min(txVer, tracking);
                    res.addTx(txVer);
                }
            }
            TreeMap<Long, AtomicInteger> nodeMap = this.activeQueries.get(nodeId);
            if (nodeMap == null) {
                nodeMap = new TreeMap();
                this.activeQueries.put(nodeId, nodeMap);
                nodeMap.put(tracking, new AtomicInteger(1));
            } else {
                AtomicInteger cntr = nodeMap.get(tracking);
                if (cntr == null) {
                    nodeMap.put(tracking, new AtomicInteger(1));
                } else {
                    cntr.incrementAndGet();
                }
            }
            if (this.minQry == null) {
                this.minQry = tracking;
            }
            res.init(futId, MvccProcessorImpl.this.curCrd.version(), ver, 0x1FFFFFFF, 0L, tracking);
            return res;
        }

        private synchronized void onQueryDone(UUID nodeId, Long ver) {
            TreeMap<Long, AtomicInteger> nodeMap = this.activeQueries.get(nodeId);
            if (nodeMap == null) {
                return;
            }
            assert (this.minQry != null);
            AtomicInteger cntr = nodeMap.get(ver);
            assert (cntr != null && cntr.get() > 0) : "onQueryDone ver=" + ver;
            if (cntr.decrementAndGet() == 0) {
                nodeMap.remove(ver);
                if (nodeMap.isEmpty()) {
                    this.activeQueries.remove(nodeId);
                }
                if (ver.equals(this.minQry)) {
                    this.minQry = this.activeMinimal();
                }
            }
        }

        private synchronized void onNodeFailed(UUID nodeId) {
            this.activeQueries.remove(nodeId);
            this.minQry = this.activeMinimal();
        }

        private Long activeMinimal() {
            Long min2 = null;
            for (TreeMap<Long, AtomicInteger> s2 : this.activeQueries.values()) {
                Long first = s2.firstKey();
                if (min2 != null && first >= min2) continue;
                min2 = first;
            }
            return min2;
        }
    }
}

