/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.blockmanagement;

import java.util.AbstractList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.hadoop2.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.flink.hadoop2.shaded.com.google.common.base.Preconditions;
import org.apache.flink.hadoop2.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.HeartbeatManager;
import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.util.CyclicIteration;
import org.apache.hadoop.util.ChunkedArrayList;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
public class DecommissionManager {
    private static final Logger LOG = LoggerFactory.getLogger(DecommissionManager.class);
    private final Namesystem namesystem;
    private final BlockManager blockManager;
    private final HeartbeatManager hbManager;
    private final ScheduledExecutorService executor;
    private final TreeMap<DatanodeDescriptor, AbstractList<BlockInfo>> decomNodeBlocks;
    private final Queue<DatanodeDescriptor> pendingNodes;
    private Monitor monitor = null;

    DecommissionManager(Namesystem namesystem, BlockManager blockManager, HeartbeatManager hbManager) {
        this.namesystem = namesystem;
        this.blockManager = blockManager;
        this.hbManager = hbManager;
        this.executor = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat("DecommissionMonitor-%d").setDaemon(true).build());
        this.decomNodeBlocks = new TreeMap();
        this.pendingNodes = new LinkedList<DatanodeDescriptor>();
    }

    void activate(Configuration conf) {
        int intervalSecs = conf.getInt("dfs.namenode.decommission.interval", 30);
        Preconditions.checkArgument(intervalSecs >= 0, "Cannot set a negative value for dfs.namenode.decommission.interval");
        int blocksPerInterval = conf.getInt("dfs.namenode.decommission.blocks.per.interval", 500000);
        int nodesPerInterval = Integer.MAX_VALUE;
        String deprecatedKey = "dfs.namenode.decommission.nodes.per.interval";
        String strNodes = conf.get("dfs.namenode.decommission.nodes.per.interval");
        if (strNodes != null) {
            nodesPerInterval = Integer.parseInt(strNodes);
            blocksPerInterval = Integer.MAX_VALUE;
            LOG.warn("Using deprecated configuration key {} value of {}.", (Object)"dfs.namenode.decommission.nodes.per.interval", (Object)nodesPerInterval);
            LOG.warn("Please update your configuration to use {} instead.", (Object)"dfs.namenode.decommission.blocks.per.interval");
        }
        Preconditions.checkArgument(blocksPerInterval > 0, "Must set a positive value for dfs.namenode.decommission.blocks.per.interval");
        int maxConcurrentTrackedNodes = conf.getInt("dfs.namenode.decommission.max.concurrent.tracked.nodes", 100);
        Preconditions.checkArgument(maxConcurrentTrackedNodes >= 0, "Cannot set a negative value for dfs.namenode.decommission.max.concurrent.tracked.nodes");
        this.monitor = new Monitor(blocksPerInterval, nodesPerInterval, maxConcurrentTrackedNodes);
        this.executor.scheduleAtFixedRate(this.monitor, intervalSecs, intervalSecs, TimeUnit.SECONDS);
        LOG.debug("Activating DecommissionManager with interval {} seconds, {} max blocks per interval, {} max nodes per interval, {} max concurrently tracked nodes.", new Object[]{intervalSecs, blocksPerInterval, nodesPerInterval, maxConcurrentTrackedNodes});
    }

    void close() {
        this.executor.shutdownNow();
        try {
            this.executor.awaitTermination(3000L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    @VisibleForTesting
    public void startDecommission(DatanodeDescriptor node) {
        if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
            this.hbManager.startDecommission(node);
            if (node.isDecommissionInProgress()) {
                for (DatanodeStorageInfo storage : node.getStorageInfos()) {
                    LOG.info("Starting decommission of {} {} with {} blocks", new Object[]{node, storage, storage.numBlocks()});
                }
                node.decommissioningStatus.setStartTime(Time.monotonicNow());
                this.pendingNodes.add(node);
            }
        } else {
            LOG.trace("startDecommission: Node {} in {}, nothing to do." + node, (Object)node.getAdminState());
        }
    }

    @VisibleForTesting
    public void stopDecommission(DatanodeDescriptor node) {
        if (node.isDecommissionInProgress() || node.isDecommissioned()) {
            this.hbManager.stopDecommission(node);
            if (node.isAlive()) {
                this.blockManager.processOverReplicatedBlocksOnReCommission(node);
            }
            this.pendingNodes.remove(node);
            this.decomNodeBlocks.remove(node);
        } else {
            LOG.trace("stopDecommission: Node {} in {}, nothing to do." + node, (Object)node.getAdminState());
        }
    }

    private void setDecommissioned(DatanodeDescriptor dn) {
        dn.setDecommissioned();
        LOG.info("Decommissioning complete for node {}", (Object)dn);
    }

    private boolean isSufficientlyReplicated(BlockInfo block, BlockCollection bc, NumberReplicas numberReplicas) {
        short numExpected = block.getReplication();
        int numLive = numberReplicas.liveReplicas();
        if (numLive >= numExpected && this.blockManager.isPlacementPolicySatisfied(block)) {
            LOG.trace("Block {} does not need replication.", (Object)block);
            return true;
        }
        LOG.trace("Block {} numExpected={}, numLive={}", new Object[]{block, (int)numExpected, numLive});
        if (numExpected > numLive) {
            if (bc.isUnderConstruction() && block.equals(bc.getLastBlock())) {
                if (numLive >= this.blockManager.minReplication) {
                    LOG.trace("UC block {} sufficiently-replicated since numLive ({}) >= minR ({})", new Object[]{block, numLive, this.blockManager.minReplication});
                    return true;
                }
                LOG.trace("UC block {} insufficiently-replicated since numLive ({}) < minR ({})", new Object[]{block, numLive, this.blockManager.minReplication});
            } else if (numLive >= this.blockManager.defaultReplication) {
                return true;
            }
        }
        return false;
    }

    private static void logBlockReplicationInfo(BlockInfo block, BlockCollection bc, DatanodeDescriptor srcNode, NumberReplicas num, Iterable<DatanodeStorageInfo> storages) {
        if (!NameNode.blockStateChangeLog.isInfoEnabled()) {
            return;
        }
        int curReplicas = num.liveReplicas();
        short curExpectedReplicas = block.getReplication();
        StringBuilder nodeList = new StringBuilder();
        for (DatanodeStorageInfo storage : storages) {
            DatanodeDescriptor node = storage.getDatanodeDescriptor();
            nodeList.append(node);
            nodeList.append(" ");
        }
        NameNode.blockStateChangeLog.info("Block: " + block + ", Expected Replicas: " + curExpectedReplicas + ", live replicas: " + curReplicas + ", corrupt replicas: " + num.corruptReplicas() + ", decommissioned replicas: " + num.decommissioned() + ", decommissioning replicas: " + num.decommissioning() + ", excess replicas: " + num.excessReplicas() + ", Is Open File: " + bc.isUnderConstruction() + ", Datanodes having this block: " + nodeList + ", Current Datanode: " + srcNode + ", Is current datanode decommissioning: " + srcNode.isDecommissionInProgress());
    }

    @VisibleForTesting
    public int getNumPendingNodes() {
        return this.pendingNodes.size();
    }

    @VisibleForTesting
    public int getNumTrackedNodes() {
        return this.decomNodeBlocks.size();
    }

    @VisibleForTesting
    public int getNumNodesChecked() {
        return this.monitor.numNodesChecked;
    }

    @VisibleForTesting
    void runMonitorForTest() throws ExecutionException, InterruptedException {
        this.executor.submit(this.monitor).get();
    }

    private class Monitor
    implements Runnable {
        private final int numBlocksPerCheck;
        private final int numNodesPerCheck;
        private final int maxConcurrentTrackedNodes;
        private int numBlocksChecked = 0;
        private int numBlocksCheckedPerLock = 0;
        private int numNodesChecked = 0;
        private DatanodeDescriptor iterkey = new DatanodeDescriptor(new DatanodeID("", "", "", 0, 0, 0, 0));

        Monitor(int numBlocksPerCheck, int numNodesPerCheck, int maxConcurrentTrackedNodes) {
            this.numBlocksPerCheck = numBlocksPerCheck;
            this.numNodesPerCheck = numNodesPerCheck;
            this.maxConcurrentTrackedNodes = maxConcurrentTrackedNodes;
        }

        private boolean exceededNumBlocksPerCheck() {
            LOG.trace("Processed {} blocks so far this tick", (Object)this.numBlocksChecked);
            return this.numBlocksChecked >= this.numBlocksPerCheck;
        }

        @Deprecated
        private boolean exceededNumNodesPerCheck() {
            LOG.trace("Processed {} nodes so far this tick", (Object)this.numNodesChecked);
            return this.numNodesChecked >= this.numNodesPerCheck;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            if (!DecommissionManager.this.namesystem.isRunning()) {
                LOG.info("Namesystem is not running, skipping decommissioning checks.");
                return;
            }
            this.numBlocksChecked = 0;
            this.numBlocksCheckedPerLock = 0;
            this.numNodesChecked = 0;
            DecommissionManager.this.namesystem.writeLock();
            try {
                this.processPendingNodes();
                this.check();
            }
            finally {
                DecommissionManager.this.namesystem.writeUnlock();
            }
            if (this.numBlocksChecked + this.numNodesChecked > 0) {
                LOG.info("Checked {} blocks and {} nodes this tick", (Object)this.numBlocksChecked, (Object)this.numNodesChecked);
            }
        }

        private void processPendingNodes() {
            while (!(DecommissionManager.this.pendingNodes.isEmpty() || this.maxConcurrentTrackedNodes != 0 && DecommissionManager.this.decomNodeBlocks.size() >= this.maxConcurrentTrackedNodes)) {
                DecommissionManager.this.decomNodeBlocks.put(DecommissionManager.this.pendingNodes.poll(), null);
            }
        }

        private void check() {
            Iterator it = new CyclicIteration(DecommissionManager.this.decomNodeBlocks, this.iterkey).iterator();
            LinkedList<DatanodeDescriptor> toRemove = new LinkedList<DatanodeDescriptor>();
            while (it.hasNext() && !this.exceededNumBlocksPerCheck() && !this.exceededNumNodesPerCheck() && DecommissionManager.this.namesystem.isRunning()) {
                ++this.numNodesChecked;
                Map.Entry entry = it.next();
                DatanodeDescriptor dn = entry.getKey();
                AbstractList<BlockInfo> blocks = (AbstractList<BlockInfo>)entry.getValue();
                boolean fullScan = false;
                if (blocks == null) {
                    LOG.debug("Newly-added node {}, doing full scan to find insufficiently-replicated blocks.", (Object)dn);
                    blocks = this.handleInsufficientlyReplicated(dn);
                    DecommissionManager.this.decomNodeBlocks.put(dn, blocks);
                    fullScan = true;
                } else {
                    LOG.debug("Processing decommission-in-progress node {}", (Object)dn);
                    this.pruneSufficientlyReplicated(dn, blocks);
                }
                if (blocks.size() == 0) {
                    if (!fullScan) {
                        LOG.debug("Node {} has finished replicating current set of blocks, checking with the full block map.", (Object)dn);
                        blocks = this.handleInsufficientlyReplicated(dn);
                        DecommissionManager.this.decomNodeBlocks.put(dn, blocks);
                    }
                    boolean isHealthy = DecommissionManager.this.blockManager.isNodeHealthyForDecommission(dn);
                    if (blocks.size() == 0 && isHealthy) {
                        DecommissionManager.this.setDecommissioned(dn);
                        toRemove.add(dn);
                        LOG.debug("Node {} is sufficiently replicated and healthy, marked as decommissioned.", (Object)dn);
                    } else {
                        LOG.debug("Node {} {} healthy. It needs to replicate {} more blocks. Decommissioning is still in progress.", new Object[]{dn, isHealthy ? "is" : "isn't", blocks.size()});
                    }
                } else {
                    LOG.debug("Node {} still has {} blocks to replicate before it is a candidate to finish decommissioning.", (Object)dn, (Object)blocks.size());
                }
                this.iterkey = dn;
            }
            for (DatanodeDescriptor dn : toRemove) {
                Preconditions.checkState(dn.isDecommissioned(), "Removing a node that is not yet decommissioned!");
                DecommissionManager.this.decomNodeBlocks.remove(dn);
            }
        }

        private void pruneSufficientlyReplicated(DatanodeDescriptor datanode, AbstractList<BlockInfo> blocks) {
            this.processBlocksForDecomInternal(datanode, blocks.iterator(), null, true);
        }

        private AbstractList<BlockInfo> handleInsufficientlyReplicated(DatanodeDescriptor datanode) {
            ChunkedArrayList<BlockInfo> insufficient = new ChunkedArrayList<BlockInfo>();
            this.processBlocksForDecomInternal(datanode, datanode.getBlockIterator(), insufficient, false);
            return insufficient;
        }

        private void processBlocksForDecomInternal(DatanodeDescriptor datanode, Iterator<BlockInfo> it, List<BlockInfo> insufficientlyReplicated, boolean pruneSufficientlyReplicated) {
            boolean firstReplicationLog = true;
            int underReplicatedBlocks = 0;
            int decommissionOnlyReplicas = 0;
            int underReplicatedInOpenFiles = 0;
            while (it.hasNext()) {
                int liveReplicas;
                if (insufficientlyReplicated == null && this.numBlocksCheckedPerLock >= this.numBlocksPerCheck) {
                    DecommissionManager.this.namesystem.writeUnlock();
                    try {
                        LOG.debug("Yielded lock during decommission check");
                        Thread.sleep(0L, 500);
                    }
                    catch (InterruptedException ignored) {
                        return;
                    }
                    this.numBlocksCheckedPerLock = 0;
                    DecommissionManager.this.namesystem.writeLock();
                }
                ++this.numBlocksChecked;
                ++this.numBlocksCheckedPerLock;
                BlockInfo block = it.next();
                if (((DecommissionManager)DecommissionManager.this).blockManager.blocksMap.getStoredBlock(block) == null) {
                    LOG.trace("Removing unknown block {}", (Object)block);
                    it.remove();
                    continue;
                }
                long bcId = block.getBlockCollectionId();
                if (bcId == -1L) continue;
                BlockCollection bc = DecommissionManager.this.blockManager.getBlockCollection(block);
                NumberReplicas num = DecommissionManager.this.blockManager.countNodes(block);
                int curReplicas = liveReplicas = num.liveReplicas();
                if (DecommissionManager.this.blockManager.isNeededReplication(block, liveReplicas) && !((DecommissionManager)DecommissionManager.this).blockManager.neededReplications.contains(block) && ((DecommissionManager)DecommissionManager.this).blockManager.pendingReplications.getNumReplicas(block) == 0 && DecommissionManager.this.blockManager.isPopulatingReplQueues()) {
                    ((DecommissionManager)DecommissionManager.this).blockManager.neededReplications.add(block, liveReplicas, num.readOnlyReplicas(), num.decommissionedAndDecommissioning(), block.getReplication());
                }
                if (DecommissionManager.this.isSufficientlyReplicated(block, bc, num)) {
                    if (!pruneSufficientlyReplicated) continue;
                    it.remove();
                    continue;
                }
                if (insufficientlyReplicated != null) {
                    insufficientlyReplicated.add(block);
                }
                if (firstReplicationLog) {
                    DecommissionManager.logBlockReplicationInfo(block, bc, datanode, num, ((DecommissionManager)DecommissionManager.this).blockManager.blocksMap.getStorages(block));
                    firstReplicationLog = false;
                }
                ++underReplicatedBlocks;
                if (bc.isUnderConstruction()) {
                    ++underReplicatedInOpenFiles;
                }
                if (curReplicas != 0 || num.decommissionedAndDecommissioning() <= 0) continue;
                ++decommissionOnlyReplicas;
            }
            datanode.decommissioningStatus.set(underReplicatedBlocks, decommissionOnlyReplicas, underReplicatedInOpenFiles);
        }
    }
}

