/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.fs;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.BindException;
import java.net.ServerSocket;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={LargeTests.class})
public class TestBlockReorder {
    private static final Log LOG = LogFactory.getLog(TestBlockReorder.class);
    private Configuration conf;
    private MiniDFSCluster cluster;
    private HBaseTestingUtility htu;
    private DistributedFileSystem dfs;
    private static final String host1 = "host1";
    private static final String host2 = "host2";
    private static final String host3 = "host3";

    @Before
    public void setUp() throws Exception {
        this.htu = new HBaseTestingUtility();
        this.htu.getConfiguration().setInt("dfs.blocksize", 1024);
        this.htu.getConfiguration().setBoolean("dfs.support.append", true);
        this.htu.getConfiguration().setInt("dfs.replication", 3);
        this.htu.startMiniDFSCluster(3, new String[]{"/r1", "/r2", "/r3"}, new String[]{host1, host2, host3});
        this.conf = this.htu.getConfiguration();
        this.cluster = this.htu.getDFSCluster();
        this.dfs = (DistributedFileSystem)FileSystem.get((Configuration)this.conf);
    }

    @After
    public void tearDownAfterClass() throws Exception {
        this.htu.shutdownMiniCluster();
    }

    @Test
    public void testBlockLocationReorder() throws Exception {
        ServerSocket ssI;
        BlockLocation[] lbs;
        Path p = new Path("hello");
        Assert.assertTrue(((short)this.cluster.getDataNodes().size() > 1 ? 1 : 0) != 0);
        int repCount = 2;
        FSDataOutputStream fop = this.dfs.create(p, (short)2);
        double toWrite = 875.5613;
        fop.writeDouble(875.5613);
        fop.close();
        long start = System.currentTimeMillis();
        FSDataInputStream fin = this.dfs.open(p);
        Assert.assertTrue((875.5613 == fin.readDouble() ? 1 : 0) != 0);
        long end = System.currentTimeMillis();
        LOG.info((Object)("readtime= " + (end - start)));
        fin.close();
        Assert.assertTrue((end - start < 30000L ? 1 : 0) != 0);
        FileStatus f = this.dfs.getFileStatus(p);
        while ((lbs = this.dfs.getFileBlockLocations(f, 0L, 1L)).length != 1 && lbs[0].getLength() != 2L) {
        }
        String name = lbs[0].getNames()[0];
        Assert.assertTrue((name.indexOf(58) > 0 ? 1 : 0) != 0);
        String portS = name.substring(name.indexOf(58) + 1);
        int port = Integer.parseInt(portS);
        LOG.info((Object)("port= " + port));
        int ipcPort = -1;
        boolean ok = false;
        final String lookup = lbs[0].getHosts()[0];
        StringBuilder sb = new StringBuilder();
        for (DataNode dn : this.cluster.getDataNodes()) {
            String dnName = this.getHostName(dn);
            sb.append(dnName).append(' ');
            if (!lookup.equals(dnName)) continue;
            ok = true;
            LOG.info((Object)("killing datanode " + name + " / " + lookup));
            ipcPort = dn.ipcServer.getListenerAddress().getPort();
            dn.shutdown();
            LOG.info((Object)("killed datanode " + name + " / " + lookup));
            break;
        }
        Assert.assertTrue((String)("didn't find the server to kill, was looking for " + lookup + " found " + sb), (boolean)ok);
        LOG.info((Object)("ipc port= " + ipcPort));
        Assert.assertTrue((boolean)HFileSystem.addLocationsOrderInterceptor((Configuration)this.conf, (HFileSystem.ReorderBlocks)new HFileSystem.ReorderBlocks(){

            public void reorderBlocks(Configuration c, LocatedBlocks lbs, String src) {
                for (LocatedBlock lb : lbs.getLocatedBlocks()) {
                    DatanodeInfo[] infos;
                    if (lb.getLocations().length <= 1 || !(infos = lb.getLocations())[0].getHostName().equals(lookup)) continue;
                    LOG.info((Object)"HFileSystem bad host, inverting");
                    DatanodeInfo tmp = infos[0];
                    infos[0] = infos[1];
                    infos[1] = tmp;
                }
            }
        }));
        int retries = 10;
        ServerSocket ss = null;
        try {
            ss = new ServerSocket(port);
            ssI = new ServerSocket(ipcPort);
        }
        catch (BindException be) {
            LOG.warn((Object)("Got bind exception trying to set up socket on " + port + " or " + ipcPort + ", this means that the datanode has not closed the socket or" + " someone else took it. It may happen, skipping this test for this time."), (Throwable)be);
            if (ss != null) {
                ss.close();
            }
            return;
        }
        for (int i = 0; i < 10; ++i) {
            start = System.currentTimeMillis();
            fin = this.dfs.open(p);
            Assert.assertTrue((875.5613 == fin.readDouble() ? 1 : 0) != 0);
            fin.close();
            end = System.currentTimeMillis();
            LOG.info((Object)("HFileSystem readtime= " + (end - start)));
            Assert.assertFalse((String)"We took too much time to read", (end - start > 60000L ? 1 : 0) != 0);
        }
        ss.close();
        ssI.close();
    }

    private String getHostName(DataNode dn) throws InvocationTargetException, IllegalAccessException {
        Method m;
        try {
            m = DataNode.class.getMethod("getDisplayName", new Class[0]);
        }
        catch (NoSuchMethodException e) {
            try {
                m = DataNode.class.getMethod("getHostName", new Class[0]);
            }
            catch (NoSuchMethodException e1) {
                throw new RuntimeException(e1);
            }
        }
        String res = (String)m.invoke((Object)dn, new Object[0]);
        if (res.contains(":")) {
            return res.split(":")[0];
        }
        return res;
    }

    @Test
    public void testHBaseCluster() throws Exception {
        byte[] sb = "sb".getBytes();
        this.htu.startMiniZKCluster();
        MiniHBaseCluster hbm = this.htu.startMiniHBaseCluster(1, 1);
        hbm.waitForActiveAndReadyMaster();
        hbm.getRegionServer(0).waitForServerOnline();
        HRegionServer targetRs = hbm.getRegionServer(0);
        String host4 = targetRs.getServerName().getHostname();
        LOG.info((Object)("Starting a new datanode with the name=" + host4));
        this.cluster.startDataNodes(this.conf, 1, true, null, new String[]{"/r4"}, new String[]{host4}, null);
        this.cluster.waitClusterUp();
        int repCount = 3;
        this.conf = targetRs.getConfiguration();
        HFileSystem rfs = (HFileSystem)targetRs.getFileSystem();
        HTable h = this.htu.createTable(TableName.valueOf((String)"table"), sb);
        String rootDir = new Path(FSUtils.getRootDir((Configuration)this.conf) + "/" + "WALs" + "/" + targetRs.getServerName().toString()).toUri().getPath();
        DistributedFileSystem mdfs = (DistributedFileSystem)hbm.getMaster().getMasterFileSystem().getFileSystem();
        int nbTest = 0;
        while (nbTest < 10) {
            List regions = targetRs.getOnlineRegions(h.getName());
            final CountDownLatch latch = new CountDownLatch(regions.size());
            WALActionsListener.Base listener = new WALActionsListener.Base(){

                public void postLogRoll(Path oldPath, Path newPath) throws IOException {
                    latch.countDown();
                }
            };
            for (HRegion region : regions) {
                region.getWAL().registerWALActionsListener((WALActionsListener)listener);
            }
            this.htu.getHBaseAdmin().rollWALWriter(targetRs.getServerName());
            try {
                latch.await();
            }
            catch (InterruptedException exception) {
                LOG.warn((Object)("Interrupted while waiting for the wal of '" + targetRs + "' to roll. If later " + "tests fail, it's probably because we should still be waiting."));
                Thread.currentThread().interrupt();
            }
            for (HRegion region : regions) {
                region.getWAL().unregisterWALActionsListener((WALActionsListener)listener);
            }
            Thread.sleep(100L);
            Put p = new Put(sb);
            p.add(sb, sb, sb);
            h.put(p);
            DirectoryListing dl = this.dfs.getClient().listPaths(rootDir, HdfsFileStatus.EMPTY_NAME);
            HdfsFileStatus[] hfs = dl.getPartialListing();
            Assert.assertTrue((hfs.length >= 1 ? 1 : 0) != 0);
            for (HdfsFileStatus hf : hfs) {
                try {
                    LOG.info((Object)("Log file found: " + hf.getLocalName() + " in " + rootDir));
                    String logFile = rootDir + "/" + hf.getLocalName();
                    FileStatus fsLog = rfs.getFileStatus(new Path(logFile));
                    LOG.info((Object)("Checking log file: " + logFile));
                    BlockLocation[] bls = rfs.getFileBlockLocations(fsLog, 0L, 1L);
                    if (bls.length <= 0) continue;
                    BlockLocation bl = bls[0];
                    LOG.info((Object)(bl.getHosts().length + " replicas for block 0 in " + logFile + " "));
                    for (int i = 0; i < bl.getHosts().length - 1; ++i) {
                        LOG.info((Object)(bl.getHosts()[i] + "    " + logFile));
                        Assert.assertNotSame((Object)bl.getHosts()[i], (Object)host4);
                    }
                    String last = bl.getHosts()[bl.getHosts().length - 1];
                    LOG.info((Object)(last + "    " + logFile));
                    if (!host4.equals(last)) continue;
                    ++nbTest;
                    LOG.info((Object)(logFile + " is on the new datanode and is ok"));
                    if (bl.getHosts().length != 3) continue;
                    this.testFromDFS(this.dfs, logFile, 3, host4);
                    this.testFromDFS(mdfs, logFile, 3, host4);
                }
                catch (FileNotFoundException exception) {
                    LOG.debug((Object)("Failed to find log file '" + hf.getLocalName() + "'; it probably was " + "archived out from under us so we'll ignore and retry. If this test hangs " + "indefinitely you should treat this failure as a symptom."), (Throwable)exception);
                }
                catch (RemoteException exception) {
                    if (exception.unwrapRemoteException() instanceof FileNotFoundException) {
                        LOG.debug((Object)("Failed to find log file '" + hf.getLocalName() + "'; it probably was " + "archived out from under us so we'll ignore and retry. If this test hangs " + "indefinitely you should treat this failure as a symptom."), (Throwable)exception);
                        continue;
                    }
                    throw exception;
                }
            }
        }
    }

    private void testFromDFS(DistributedFileSystem dfs, String src, int repCount, String localhost) throws Exception {
        for (int i = 0; i < 10; ++i) {
            int y;
            LocatedBlocks l;
            boolean done;
            long max = System.currentTimeMillis() + 10000L;
            do {
                Assert.assertTrue((String)"Can't get enouth replica.", (System.currentTimeMillis() < max ? 1 : 0) != 0);
                l = TestBlockReorder.getNamenode(dfs.getClient()).getBlockLocations(src, 0L, 1L);
                Assert.assertNotNull((String)("Can't get block locations for " + src), (Object)l);
                Assert.assertNotNull((Object)l.getLocatedBlocks());
                Assert.assertTrue((l.getLocatedBlocks().size() > 0 ? 1 : 0) != 0);
                done = true;
                for (y = 0; y < l.getLocatedBlocks().size() && done; ++y) {
                    done = l.get(y).getLocations().length == repCount;
                }
            } while (!done);
            for (y = 0; y < l.getLocatedBlocks().size() && done; ++y) {
                Assert.assertEquals((Object)localhost, (Object)l.get(y).getLocations()[repCount - 1].getHostName());
            }
        }
    }

    private static ClientProtocol getNamenode(DFSClient dfsc) throws Exception {
        Field nf = DFSClient.class.getDeclaredField("namenode");
        nf.setAccessible(true);
        return (ClientProtocol)nf.get(dfsc);
    }

    @Test
    public void testBlockLocation() throws Exception {
        this.htu.startMiniZKCluster();
        MiniHBaseCluster hbm = this.htu.startMiniHBaseCluster(1, 1);
        this.conf = hbm.getConfiguration();
        String fileName = "/helloWorld";
        Path p = new Path("/helloWorld");
        int repCount = 3;
        Assert.assertTrue(((short)this.cluster.getDataNodes().size() >= 3 ? 1 : 0) != 0);
        FSDataOutputStream fop = this.dfs.create(p, (short)3);
        double toWrite = 875.5613;
        fop.writeDouble(875.5613);
        fop.close();
        for (int i = 0; i < 10; ++i) {
            LocatedBlocks l;
            long max = System.currentTimeMillis() + 10000L;
            do {
                l = TestBlockReorder.getNamenode(this.dfs.getClient()).getBlockLocations("/helloWorld", 0L, 1L);
                Assert.assertNotNull((Object)l.getLocatedBlocks());
                Assert.assertEquals((long)l.getLocatedBlocks().size(), (long)1L);
                Assert.assertTrue((String)("Expecting 3 , got " + l.get(0).getLocations().length), (System.currentTimeMillis() < max ? 1 : 0) != 0);
            } while (l.get(0).getLocations().length != 3);
            Object[] originalList = l.getLocatedBlocks().toArray();
            HFileSystem.ReorderWALBlocks lrb = new HFileSystem.ReorderWALBlocks();
            lrb.reorderBlocks(this.conf, l, "/helloWorld");
            Assert.assertArrayEquals((Object[])originalList, (Object[])l.getLocatedBlocks().toArray());
            Assert.assertNotNull((Object)this.conf.get("hbase.rootdir"));
            Assert.assertFalse((boolean)this.conf.get("hbase.rootdir").isEmpty());
            String pseudoLogFile = this.conf.get("hbase.rootdir") + "/" + "WALs" + "/" + host1 + ",6977,6576" + "/mylogfile";
            Assert.assertNotNull((String)("log= " + pseudoLogFile), (Object)DefaultWALProvider.getServerNameFromWALDirectoryName((Configuration)this.dfs.getConf(), (String)pseudoLogFile));
            lrb.reorderBlocks(this.conf, l, pseudoLogFile);
            Assert.assertEquals((Object)host1, (Object)l.get(0).getLocations()[2].getHostName());
            lrb.reorderBlocks(this.conf, l, pseudoLogFile);
            Assert.assertEquals((Object)host1, (Object)l.get(0).getLocations()[2].getHostName());
        }
    }

    static {
        ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
        ((Log4JLogger)HFileSystem.LOG).getLogger().setLevel(Level.ALL);
    }
}

