/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.datanode;

import java.io.IOException;
import java.net.SocketTimeoutException;
import java.nio.channels.AsynchronousCloseException;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.hadoop2.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.PeerServer;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataXceiver;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Daemon;
import org.slf4j.Logger;

class DataXceiverServer
implements Runnable {
    public static final Logger LOG = DataNode.LOG;
    private final PeerServer peerServer;
    private final DataNode datanode;
    private final HashMap<Peer, Thread> peers = new HashMap();
    private final HashMap<Peer, DataXceiver> peersXceiver = new HashMap();
    private boolean closed = false;
    int maxXceiverCount = 4096;
    final BlockBalanceThrottler balanceThrottler;
    final long estimateBlockSize;

    DataXceiverServer(PeerServer peerServer, Configuration conf, DataNode datanode) {
        this.peerServer = peerServer;
        this.datanode = datanode;
        this.maxXceiverCount = conf.getInt("dfs.datanode.max.transfer.threads", 4096);
        this.estimateBlockSize = conf.getLongBytes("dfs.blocksize", 0x8000000L);
        this.balanceThrottler = new BlockBalanceThrottler(conf.getLongBytes("dfs.datanode.balance.bandwidthPerSec", 0xA00000L), conf.getInt("dfs.datanode.balance.max.concurrent.moves", 50));
    }

    @Override
    public void run() {
        Peer peer = null;
        while (this.datanode.shouldRun && !this.datanode.shutdownForUpgrade) {
            try {
                peer = this.peerServer.accept();
                int curXceiverCount = this.datanode.getXceiverCount();
                if (curXceiverCount > this.maxXceiverCount) {
                    throw new IOException("Xceiver count " + curXceiverCount + " exceeds the limit of concurrent xcievers: " + this.maxXceiverCount);
                }
                new Daemon(this.datanode.threadGroup, DataXceiver.create(peer, this.datanode, this)).start();
            }
            catch (SocketTimeoutException curXceiverCount) {
            }
            catch (AsynchronousCloseException ace) {
                if (!this.datanode.shouldRun || this.datanode.shutdownForUpgrade) continue;
                LOG.warn(this.datanode.getDisplayName() + ":DataXceiverServer: ", (Throwable)ace);
            }
            catch (IOException ie) {
                IOUtils.cleanup(null, peer);
                LOG.warn(this.datanode.getDisplayName() + ":DataXceiverServer: ", (Throwable)ie);
            }
            catch (OutOfMemoryError ie) {
                IOUtils.cleanup(null, peer);
                LOG.error("DataNode is out of memory. Will retry in 30 seconds.", (Throwable)ie);
                try {
                    Thread.sleep(30000L);
                }
                catch (InterruptedException interruptedException) {
                }
            }
            catch (Throwable te) {
                LOG.error(this.datanode.getDisplayName() + ":DataXceiverServer: Exiting due to: ", te);
                this.datanode.shouldRun = false;
            }
        }
        try {
            this.peerServer.close();
            this.closed = true;
        }
        catch (IOException ie) {
            LOG.warn(this.datanode.getDisplayName() + " :DataXceiverServer: close exception", (Throwable)ie);
        }
        if (this.datanode.shutdownForUpgrade) {
            this.restartNotifyPeers();
            LOG.info("Shutting down DataXceiverServer before restart");
            for (int i = 0; this.getNumPeers() > 0 && i < 10; ++i) {
                try {
                    Thread.sleep(200L);
                    continue;
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            }
        }
        this.closeAllPeers();
    }

    void kill() {
        assert (!this.datanode.shouldRun || this.datanode.shutdownForUpgrade) : "shoudRun should be set to false or restarting should be true before killing";
        try {
            this.peerServer.close();
            this.closed = true;
        }
        catch (IOException ie) {
            LOG.warn(this.datanode.getDisplayName() + ":DataXceiverServer.kill(): ", (Throwable)ie);
        }
    }

    synchronized void addPeer(Peer peer, Thread t, DataXceiver xceiver) throws IOException {
        if (this.closed) {
            throw new IOException("Server closed.");
        }
        this.peers.put(peer, t);
        this.peersXceiver.put(peer, xceiver);
    }

    synchronized void closePeer(Peer peer) {
        this.peers.remove(peer);
        this.peersXceiver.remove(peer);
        IOUtils.cleanup(null, peer);
    }

    public synchronized void sendOOBToPeers() {
        if (!this.datanode.shutdownForUpgrade) {
            return;
        }
        for (Peer p : this.peers.keySet()) {
            try {
                this.peersXceiver.get(p).sendOOB();
            }
            catch (IOException e) {
                LOG.warn("Got error when sending OOB message.", (Throwable)e);
            }
            catch (InterruptedException e) {
                LOG.warn("Interrupted when sending OOB message.");
            }
        }
    }

    public synchronized void stopWriters() {
        for (Peer p : this.peers.keySet()) {
            this.peersXceiver.get(p).stopWriter();
        }
    }

    synchronized void restartNotifyPeers() {
        assert (this.datanode.shouldRun && this.datanode.shutdownForUpgrade);
        for (Thread t : this.peers.values()) {
            t.interrupt();
        }
    }

    synchronized void closeAllPeers() {
        LOG.info("Closing all peers.");
        for (Peer p : this.peers.keySet()) {
            IOUtils.cleanup(null, p);
        }
        this.peers.clear();
        this.peersXceiver.clear();
    }

    synchronized int getNumPeers() {
        return this.peers.size();
    }

    @VisibleForTesting
    synchronized int getNumPeersXceiver() {
        return this.peersXceiver.size();
    }

    @VisibleForTesting
    PeerServer getPeerServer() {
        return this.peerServer;
    }

    synchronized void releasePeer(Peer peer) {
        this.peers.remove(peer);
        this.peersXceiver.remove(peer);
    }

    public void updateBalancerMaxConcurrentMovers(int movers) {
        this.balanceThrottler.setMaxConcurrentMovers(movers);
    }

    static class BlockBalanceThrottler
    extends DataTransferThrottler {
        private int numThreads;
        private final AtomicInteger maxThreads = new AtomicInteger(0);

        private BlockBalanceThrottler(long bandwidth, int maxThreads) {
            super(bandwidth);
            this.maxThreads.set(maxThreads);
            LOG.info("Balancing bandwith is " + bandwidth + " bytes/s");
            LOG.info("Number threads for balancing is " + maxThreads);
        }

        private void setMaxConcurrentMovers(int movers) {
            this.maxThreads.set(movers);
        }

        @VisibleForTesting
        int getMaxConcurrentMovers() {
            return this.maxThreads.get();
        }

        synchronized boolean acquire() {
            if (this.numThreads >= this.maxThreads.get()) {
                return false;
            }
            ++this.numThreads;
            return true;
        }

        synchronized void release() {
            --this.numThreads;
        }
    }
}

