/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.qjournal.server;

import com.google.common.base.Charsets;
import com.google.common.base.Stopwatch;
import com.google.common.primitives.Bytes;
import com.google.common.primitives.Ints;
import java.io.File;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.concurrent.ExecutionException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
import org.apache.hadoop.hdfs.qjournal.client.IPCLoggerChannel;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
import org.apache.hadoop.hdfs.qjournal.server.Journal;
import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MetricsAsserts;
import org.apache.hadoop.test.PathUtils;
import org.apache.hadoop.util.Shell;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class TestJournalNode {
    private static final NamespaceInfo FAKE_NSINFO = new NamespaceInfo(12345, "mycluster", "my-bp", 0L);
    private static final File TEST_BUILD_DATA = PathUtils.getTestDir(TestJournalNode.class);
    private JournalNode jn;
    private Journal journal;
    private final Configuration conf = new Configuration();
    private IPCLoggerChannel ch;
    private String journalId;

    @Before
    public void setup() throws Exception {
        File editsDir = new File(MiniDFSCluster.getBaseDirectory() + File.separator + "TestJournalNode");
        FileUtil.fullyDelete((File)editsDir);
        this.conf.set("dfs.journalnode.edits.dir", editsDir.getAbsolutePath());
        this.conf.set("dfs.journalnode.rpc-address", "0.0.0.0:0");
        this.jn = new JournalNode();
        this.jn.setConf(this.conf);
        this.jn.start();
        this.journalId = "test-journalid-" + GenericTestUtils.uniqueSequenceId();
        this.journal = this.jn.getOrCreateJournal(this.journalId);
        this.journal.format(FAKE_NSINFO);
        this.ch = new IPCLoggerChannel(this.conf, FAKE_NSINFO, this.journalId, this.jn.getBoundIpcAddress());
    }

    @After
    public void teardown() throws Exception {
        this.jn.stop(0);
    }

    @Test(timeout=100000L)
    public void testJournal() throws Exception {
        MetricsRecordBuilder metrics = MetricsAsserts.getMetrics((String)this.journal.getMetricsForTests().getName());
        MetricsAsserts.assertCounter((String)"BatchesWritten", (long)0L, (MetricsRecordBuilder)metrics);
        MetricsAsserts.assertCounter((String)"BatchesWrittenWhileLagging", (long)0L, (MetricsRecordBuilder)metrics);
        MetricsAsserts.assertGauge((String)"CurrentLagTxns", (long)0L, (MetricsRecordBuilder)metrics);
        IPCLoggerChannel ch = new IPCLoggerChannel(this.conf, FAKE_NSINFO, this.journalId, this.jn.getBoundIpcAddress());
        ch.newEpoch(1L).get();
        ch.setEpoch(1L);
        ch.startLogSegment(1L, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION).get();
        ch.sendEdits(1L, 1L, 1, "hello".getBytes(Charsets.UTF_8)).get();
        metrics = MetricsAsserts.getMetrics((String)this.journal.getMetricsForTests().getName());
        MetricsAsserts.assertCounter((String)"BatchesWritten", (long)1L, (MetricsRecordBuilder)metrics);
        MetricsAsserts.assertCounter((String)"BatchesWrittenWhileLagging", (long)0L, (MetricsRecordBuilder)metrics);
        MetricsAsserts.assertGauge((String)"CurrentLagTxns", (long)0L, (MetricsRecordBuilder)metrics);
        ch.setCommittedTxId(100L);
        ch.sendEdits(1L, 2L, 1, "goodbye".getBytes(Charsets.UTF_8)).get();
        metrics = MetricsAsserts.getMetrics((String)this.journal.getMetricsForTests().getName());
        MetricsAsserts.assertCounter((String)"BatchesWritten", (long)2L, (MetricsRecordBuilder)metrics);
        MetricsAsserts.assertCounter((String)"BatchesWrittenWhileLagging", (long)1L, (MetricsRecordBuilder)metrics);
        MetricsAsserts.assertGauge((String)"CurrentLagTxns", (long)98L, (MetricsRecordBuilder)metrics);
    }

    @Test(timeout=100000L)
    public void testReturnsSegmentInfoAtEpochTransition() throws Exception {
        this.ch.newEpoch(1L).get();
        this.ch.setEpoch(1L);
        this.ch.startLogSegment(1L, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION).get();
        this.ch.sendEdits(1L, 1L, 2, QJMTestUtil.createTxnData(1, 2)).get();
        QJournalProtocolProtos.NewEpochResponseProto response = (QJournalProtocolProtos.NewEpochResponseProto)this.ch.newEpoch(2L).get();
        this.ch.setEpoch(2L);
        Assert.assertEquals((long)1L, (long)response.getLastSegmentTxId());
        this.ch.finalizeLogSegment(1L, 2L).get();
        response = (QJournalProtocolProtos.NewEpochResponseProto)this.ch.newEpoch(3L).get();
        this.ch.setEpoch(3L);
        Assert.assertEquals((long)1L, (long)response.getLastSegmentTxId());
        this.ch.startLogSegment(3L, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION).get();
        response = (QJournalProtocolProtos.NewEpochResponseProto)this.ch.newEpoch(4L).get();
        this.ch.setEpoch(4L);
        Assert.assertEquals((long)1L, (long)response.getLastSegmentTxId());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=100000L)
    public void testHttpServer() throws Exception {
        String urlRoot = this.jn.getHttpServerURI();
        String pageContents = DFSTestUtil.urlGet(new URL(urlRoot + "/jmx"));
        Assert.assertTrue((String)("Bad contents: " + pageContents), (boolean)pageContents.contains("Hadoop:service=JournalNode,name=JvmMetrics"));
        pageContents = DFSTestUtil.urlGet(new URL(urlRoot + "/journalstatus.jsp"));
        Assert.assertTrue((boolean)pageContents.contains("JournalNode"));
        byte[] EDITS_DATA = QJMTestUtil.createTxnData(1, 3);
        IPCLoggerChannel ch = new IPCLoggerChannel(this.conf, FAKE_NSINFO, this.journalId, this.jn.getBoundIpcAddress());
        ch.newEpoch(1L).get();
        ch.setEpoch(1L);
        ch.startLogSegment(1L, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION).get();
        ch.sendEdits(1L, 1L, 3, EDITS_DATA).get();
        ch.finalizeLogSegment(1L, 3L).get();
        byte[] retrievedViaHttp = DFSTestUtil.urlGetBytes(new URL(urlRoot + "/getJournal?segmentTxId=1&jid=" + this.journalId));
        byte[] expected = Bytes.concat((byte[][])new byte[][]{Ints.toByteArray((int)HdfsConstants.NAMENODE_LAYOUT_VERSION), {0, 0, 0, 0}, EDITS_DATA});
        Assert.assertArrayEquals((byte[])expected, (byte[])retrievedViaHttp);
        URL badUrl = new URL(urlRoot + "/getJournal?segmentTxId=12345&jid=" + this.journalId);
        HttpURLConnection connection = (HttpURLConnection)badUrl.openConnection();
        try {
            Assert.assertEquals((long)404L, (long)connection.getResponseCode());
        }
        finally {
            connection.disconnect();
        }
    }

    @Test(timeout=100000L)
    public void testAcceptRecoveryBehavior() throws Exception {
        try {
            this.ch.prepareRecovery(1L).get();
            Assert.fail((String)"Did not throw IllegalState when trying to run paxos without an epoch");
        }
        catch (ExecutionException ise) {
            GenericTestUtils.assertExceptionContains((String)"bad epoch", (Throwable)ise);
        }
        this.ch.newEpoch(1L).get();
        this.ch.setEpoch(1L);
        QJournalProtocolProtos.PrepareRecoveryResponseProto prep = (QJournalProtocolProtos.PrepareRecoveryResponseProto)this.ch.prepareRecovery(1L).get();
        System.err.println("Prep: " + prep);
        Assert.assertFalse((boolean)prep.hasAcceptedInEpoch());
        Assert.assertFalse((boolean)prep.hasSegmentState());
        this.ch.startLogSegment(1L, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION).get();
        this.ch.sendEdits(1L, 1L, 1, QJMTestUtil.createTxnData(1, 1)).get();
        prep = (QJournalProtocolProtos.PrepareRecoveryResponseProto)this.ch.prepareRecovery(1L).get();
        System.err.println("Prep: " + prep);
        Assert.assertFalse((boolean)prep.hasAcceptedInEpoch());
        Assert.assertTrue((boolean)prep.hasSegmentState());
        this.ch.acceptRecovery(prep.getSegmentState(), new URL("file:///dev/null")).get();
        this.ch.newEpoch(2L);
        this.ch.setEpoch(2L);
        prep = (QJournalProtocolProtos.PrepareRecoveryResponseProto)this.ch.prepareRecovery(1L).get();
        Assert.assertEquals((long)1L, (long)prep.getAcceptedInEpoch());
        Assert.assertEquals((long)1L, (long)prep.getSegmentState().getEndTxId());
        this.ch.setEpoch(1L);
        try {
            this.ch.prepareRecovery(1L).get();
            Assert.fail((String)"prepare from earlier epoch not rejected");
        }
        catch (ExecutionException ioe) {
            GenericTestUtils.assertExceptionContains((String)"epoch 1 is less than the last promised epoch 2", (Throwable)ioe);
        }
        try {
            this.ch.acceptRecovery(prep.getSegmentState(), new URL("file:///dev/null")).get();
            Assert.fail((String)"accept from earlier epoch not rejected");
        }
        catch (ExecutionException ioe) {
            GenericTestUtils.assertExceptionContains((String)"epoch 1 is less than the last promised epoch 2", (Throwable)ioe);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=100000L)
    public void testFailToStartWithBadConfig() throws Exception {
        Configuration conf = new Configuration();
        conf.set("dfs.journalnode.edits.dir", "non-absolute-path");
        TestJournalNode.assertJNFailsToStart(conf, "should be an absolute path");
        File existingFile = new File(TEST_BUILD_DATA, "testjournalnodefile");
        Assert.assertTrue((boolean)existingFile.createNewFile());
        try {
            conf.set("dfs.journalnode.edits.dir", existingFile.getAbsolutePath());
            TestJournalNode.assertJNFailsToStart(conf, "Not a directory");
        }
        finally {
            existingFile.delete();
        }
        conf.set("dfs.journalnode.edits.dir", Shell.WINDOWS ? "\\\\cannotBeCreated" : "/proc/does-not-exist");
        TestJournalNode.assertJNFailsToStart(conf, "Can not create directory");
    }

    private static void assertJNFailsToStart(Configuration conf, String errString) {
        try {
            JournalNode jn = new JournalNode();
            jn.setConf(conf);
            jn.start();
        }
        catch (Exception e) {
            GenericTestUtils.assertExceptionContains((String)errString, (Throwable)e);
        }
    }

    @Test(timeout=100000L)
    public void testPerformance() throws Exception {
        this.doPerfTest(8192, 1024);
    }

    private void doPerfTest(int editsSize, int numEdits) throws Exception {
        byte[] data = new byte[editsSize];
        this.ch.newEpoch(1L).get();
        this.ch.setEpoch(1L);
        this.ch.startLogSegment(1L, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION).get();
        Stopwatch sw = new Stopwatch().start();
        for (int i = 1; i < numEdits; ++i) {
            this.ch.sendEdits(1L, (long)i, 1, data).get();
        }
        long time = sw.elapsedMillis();
        System.err.println("Wrote " + numEdits + " batches of " + editsSize + " bytes in " + time + "ms");
        float avgRtt = (float)time / (float)numEdits;
        long throughput = (long)numEdits * (long)editsSize * 1000L / time;
        System.err.println("Time per batch: " + avgRtt + "ms");
        System.err.println("Throughput: " + throughput + " bytes/sec");
    }

    static {
        DefaultMetricsSystem.setMiniClusterMode((boolean)true);
    }
}

