package org.apache.hadoop.hdfs.shortcircuit;

import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.URI;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.AppendTestUtil;
import org.apache.hadoop.hdfs.ClientContext;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.TestBlockReaderLocal;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.hamcrest.CoreMatchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:lib/hadoop-hdfs-2.5.1-tests.jar:org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitLocalRead.class */
public class TestShortCircuitLocalRead {
    private static TemporarySocketDirectory sockDir;
    static final long seed = 3735928559L;
    static final int blockSize = 5120;
    final boolean simulatedStorage = false;

    @BeforeClass
    public static void init() {
        sockDir = new TemporarySocketDirectory();
        DomainSocket.disableBindPathValidation();
    }

    @AfterClass
    public static void shutdown() throws IOException {
        sockDir.close();
    }

    @Before
    public void before() {
        Assume.assumeThat(DomainSocket.getLoadingFailureReason(), CoreMatchers.equalTo(null));
    }

    static FSDataOutputStream createFile(FileSystem fileSystem, Path path, int i) throws IOException {
        return fileSystem.create(path, true, fileSystem.getConf().getInt("io.file.buffer.size", 4096), (short) i, 5120L);
    }

    private static void checkData(byte[] bArr, int i, byte[] bArr2, String str) {
        checkData(bArr, i, bArr2, bArr.length, str);
    }

    private static void checkData(byte[] bArr, int i, byte[] bArr2, int i2, String str) {
        for (int i3 = 0; i3 < i2; i3++) {
            if (bArr2[i + i3] != bArr[i3]) {
                Assert.fail(str + " byte " + (i + i3) + " differs. expected " + ((int) bArr2[i + i3]) + " actual " + ((int) bArr[i3]) + "\nexpected: " + StringUtils.byteToHexString(bArr2, i, i + i2) + "\nactual:   " + StringUtils.byteToHexString(bArr, 0, i2));
            }
        }
    }

    private static String getCurrentUser() throws IOException {
        return UserGroupInformation.getCurrentUser().getShortUserName();
    }

    static void checkFileContent(URI uri, Path path, byte[] bArr, int i, String str, Configuration configuration, boolean z) throws IOException, InterruptedException {
        DistributedFileSystem fileSystem = getFileSystem(str, uri, configuration);
        ClientContext fromConf = ClientContext.getFromConf(configuration);
        if (z) {
            Assert.assertFalse(fromConf.getDisableLegacyBlockReaderLocal());
        }
        FSDataInputStream open = fileSystem.open(path);
        byte[] bArr2 = new byte[bArr.length - i];
        open.readFully(i, bArr2);
        checkData(bArr2, i, bArr, "Read 2");
        open.close();
        byte[] bArr3 = new byte[bArr.length - i];
        FSDataInputStream open2 = fileSystem.open(path);
        IOUtils.skipFully(open2, i);
        int read = open2.read(bArr3, 0, 3);
        int read2 = read + open2.read(bArr3, read, 2);
        int read3 = read2 + open2.read(bArr3, read2, 517);
        checkData(bArr3, i, bArr, read3, "A few bytes");
        while (read3 < bArr3.length) {
            int read4 = open2.read(bArr3, read3, bArr3.length - read3);
            if (read4 < 0) {
                throw new EOFException("End of file reached before reading fully.");
            }
            read3 += read4;
        }
        checkData(bArr3, i, bArr, "Read 3");
        if (z) {
            Assert.assertTrue(fromConf.getDisableLegacyBlockReaderLocal());
        }
        open2.close();
    }

    private static byte[] arrayFromByteBuffer(ByteBuffer byteBuffer) {
        ByteBuffer duplicate = byteBuffer.duplicate();
        duplicate.clear();
        byte[] bArr = new byte[duplicate.remaining()];
        duplicate.get(bArr);
        return bArr;
    }

    static void checkFileContentDirect(URI uri, Path path, byte[] bArr, int i, String str, Configuration configuration, boolean z) throws IOException, InterruptedException {
        DistributedFileSystem fileSystem = getFileSystem(str, uri, configuration);
        ClientContext fromConf = ClientContext.getFromConf(configuration);
        if (z) {
            Assert.assertTrue(fromConf.getDisableLegacyBlockReaderLocal());
        }
        HdfsDataInputStream hdfsDataInputStream = (HdfsDataInputStream) fileSystem.open(path);
        ByteBuffer allocateDirect = ByteBuffer.allocateDirect(bArr.length - i);
        IOUtils.skipFully(hdfsDataInputStream, i);
        allocateDirect.limit(3);
        int read = hdfsDataInputStream.read(allocateDirect);
        allocateDirect.limit(read + 2);
        int read2 = read + hdfsDataInputStream.read(allocateDirect);
        allocateDirect.limit(Math.min(allocateDirect.capacity(), read2 + 517));
        int read3 = read2 + hdfsDataInputStream.read(allocateDirect);
        checkData(arrayFromByteBuffer(allocateDirect), i, bArr, read3, "A few bytes");
        allocateDirect.limit(allocateDirect.capacity());
        while (allocateDirect.hasRemaining()) {
            int read4 = hdfsDataInputStream.read(allocateDirect);
            if (read4 < 0) {
                throw new EOFException("End of file reached before reading fully.");
            }
            read3 += read4;
        }
        checkData(arrayFromByteBuffer(allocateDirect), i, bArr, "Read 3");
        if (z) {
            Assert.assertTrue(fromConf.getDisableLegacyBlockReaderLocal());
        }
        hdfsDataInputStream.close();
    }

    public void doTestShortCircuitReadLegacy(boolean z, int i, int i2, String str, String str2, boolean z2) throws IOException, InterruptedException {
        doTestShortCircuitReadImpl(z, i, i2, str, str2, z2);
    }

    public void doTestShortCircuitRead(boolean z, int i, int i2) throws IOException, InterruptedException {
        doTestShortCircuitReadImpl(z, i, i2, null, getCurrentUser(), false);
    }

    public void doTestShortCircuitReadImpl(boolean z, int i, int i2, String str, String str2, boolean z2) throws IOException, InterruptedException {
        Configuration configuration = new Configuration();
        configuration.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
        configuration.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, z);
        configuration.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, UUID.randomUUID().toString());
        configuration.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(), "TestShortCircuitLocalRead._PORT.sock").getAbsolutePath());
        if (str != null) {
            configuration.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY, str);
            configuration.setBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true);
        }
        MiniDFSCluster build = new MiniDFSCluster.Builder(configuration).numDataNodes(1).format(true).build();
        DistributedFileSystem fileSystem = build.getFileSystem();
        try {
            Assert.assertTrue("/ should be a directory", fileSystem.getFileStatus(new Path("/")).isDirectory());
            byte[] randomBytes = AppendTestUtil.randomBytes(seed, i);
            Path makeQualified = fileSystem.makeQualified(new Path("filelocal.dat"));
            FSDataOutputStream createFile = createFile(fileSystem, makeQualified, 1);
            createFile.write(randomBytes);
            createFile.close();
            URI uri = build.getURI();
            checkFileContent(uri, makeQualified, randomBytes, i2, str2, configuration, z2);
            checkFileContentDirect(uri, makeQualified, randomBytes, i2, str2, configuration, z2);
            fileSystem.close();
            build.shutdown();
        } catch (Throwable th) {
            fileSystem.close();
            build.shutdown();
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testFileLocalReadNoChecksum() throws Exception {
        doTestShortCircuitRead(true, 15460, 0);
    }

    @Test(timeout = 60000)
    public void testFileLocalReadChecksum() throws Exception {
        doTestShortCircuitRead(false, 15460, 0);
    }

    @Test(timeout = 60000)
    public void testSmallFileLocalRead() throws Exception {
        doTestShortCircuitRead(false, 13, 0);
        doTestShortCircuitRead(false, 13, 5);
        doTestShortCircuitRead(true, 13, 0);
        doTestShortCircuitRead(true, 13, 5);
    }

    @Test(timeout = 60000)
    public void testLocalReadLegacy() throws Exception {
        doTestShortCircuitReadLegacy(true, 13, 0, getCurrentUser(), getCurrentUser(), false);
    }

    @Test(timeout = 60000)
    public void testLocalReadFallback() throws Exception {
        doTestShortCircuitReadLegacy(true, 13, 0, getCurrentUser(), "notallowed", true);
    }

    @Test(timeout = 60000)
    public void testReadFromAnOffset() throws Exception {
        doTestShortCircuitRead(false, 15460, 777);
        doTestShortCircuitRead(true, 15460, 777);
    }

    @Test(timeout = 60000)
    public void testLongFile() throws Exception {
        doTestShortCircuitRead(false, 51300, 777);
        doTestShortCircuitRead(true, 51300, 777);
    }

    private static DistributedFileSystem getFileSystem(String str, final URI uri, final Configuration configuration) throws InterruptedException, IOException {
        return (DistributedFileSystem) UserGroupInformation.createRemoteUser(str).doAs(new PrivilegedExceptionAction<DistributedFileSystem>() { // from class: org.apache.hadoop.hdfs.shortcircuit.TestShortCircuitLocalRead.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.security.PrivilegedExceptionAction
            public DistributedFileSystem run() throws Exception {
                return (DistributedFileSystem) FileSystem.get(uri, configuration);
            }
        });
    }

    @Test(timeout = 10000)
    public void testDeprecatedGetBlockLocalPathInfoRpc() throws IOException {
        Configuration configuration = new Configuration();
        MiniDFSCluster build = new MiniDFSCluster.Builder(configuration).numDataNodes(1).format(true).build();
        build.waitActive();
        DistributedFileSystem fileSystem = build.getFileSystem();
        try {
            DFSTestUtil.createFile(fileSystem, new Path("/tmp/x"), 16L, (short) 1, 23L);
            LocatedBlocks blockLocations = build.getNameNode().getRpcServer().getBlockLocations("/tmp/x", 0L, 16L);
            try {
                DFSUtil.createClientDatanodeProtocolProxy((DatanodeID) blockLocations.get(0).getLocations()[0], configuration, 60000, false).getBlockLocalPathInfo(new ExtendedBlock(blockLocations.get(0).getBlock()), blockLocations.get(0).getBlockToken());
                Assert.fail("The call should have failed as this user  is not allowed to call getBlockLocalPathInfo");
            } catch (IOException e) {
                Assert.assertTrue(e.getMessage().contains("not allowed to call getBlockLocalPathInfo"));
            }
        } finally {
            fileSystem.close();
            build.shutdown();
        }
    }

    @Test(timeout = 10000)
    public void testSkipWithVerifyChecksum() throws IOException {
        Configuration configuration = new Configuration();
        configuration.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
        configuration.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false);
        configuration.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, "/tmp/testSkipWithVerifyChecksum._PORT");
        DomainSocket.disableBindPathValidation();
        MiniDFSCluster build = new MiniDFSCluster.Builder(configuration).numDataNodes(1).format(true).build();
        DistributedFileSystem fileSystem = build.getFileSystem();
        try {
            Assert.assertTrue("/ should be a directory", fileSystem.getFileStatus(new Path("/")).isDirectory());
            byte[] randomBytes = AppendTestUtil.randomBytes(seed, blockSize * 3);
            Path path = new Path("filelocal.dat");
            FSDataOutputStream createFile = createFile(fileSystem, path, 1);
            createFile.write(randomBytes);
            createFile.close();
            FSDataInputStream open = fileSystem.open(path);
            byte[] bArr = new byte[randomBytes.length];
            int read = open.read(bArr, 0, 3);
            long j = (2 * blockSize) + 3;
            open.seek(j);
            open.read(bArr, (int) (j + read), 3);
            open.close();
            fileSystem.close();
            build.shutdown();
        } catch (Throwable th) {
            fileSystem.close();
            build.shutdown();
            throw th;
        }
    }

    @Test(timeout = YarnConfiguration.DEFAULT_NM_DISK_HEALTH_CHECK_INTERVAL_MS)
    public void testHandleTruncatedBlockFile() throws IOException {
        MiniDFSCluster miniDFSCluster = null;
        HdfsConfiguration hdfsConfiguration = new HdfsConfiguration();
        hdfsConfiguration.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
        hdfsConfiguration.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false);
        hdfsConfiguration.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, "/tmp/testHandleTruncatedBlockFile._PORT");
        hdfsConfiguration.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT);
        Path path = new Path("/a");
        Path path2 = new Path("/b");
        FSDataInputStream fSDataInputStream = null;
        try {
            miniDFSCluster = new MiniDFSCluster.Builder(hdfsConfiguration).numDataNodes(1).build();
            miniDFSCluster.waitActive();
            DistributedFileSystem fileSystem = miniDFSCluster.getFileSystem();
            DFSTestUtil.createFile(fileSystem, path, 3456L, (short) 1, 4567L);
            DFSTestUtil.createFile(fileSystem, path2, 3456L, (short) 1, 4568L);
            FSDataInputStream open = miniDFSCluster.getFileSystem().open(path2);
            byte[] bArr = new byte[3456];
            IOUtils.readFully(open, bArr, 0, 3456);
            open.close();
            fSDataInputStream = null;
            try {
                try {
                    DFSTestUtil.waitReplication((FileSystem) fileSystem, path, (short) 1);
                } catch (InterruptedException e) {
                    Assert.fail("unexpected InterruptedException during waitReplication: " + e);
                }
            } catch (TimeoutException e2) {
                Assert.fail("unexpected TimeoutException during waitReplication: " + e2);
            }
            File blockFile = MiniDFSCluster.getBlockFile(0, DFSTestUtil.getFirstBlock(fileSystem, path));
            miniDFSCluster.shutdown();
            miniDFSCluster = null;
            RandomAccessFile randomAccessFile = null;
            try {
                randomAccessFile = new RandomAccessFile(blockFile, "rw");
                randomAccessFile.setLength(0L);
                if (randomAccessFile != null) {
                    randomAccessFile.close();
                }
                miniDFSCluster = new MiniDFSCluster.Builder(hdfsConfiguration).numDataNodes(1).format(false).build();
                miniDFSCluster.waitActive();
                DistributedFileSystem fileSystem2 = miniDFSCluster.getFileSystem();
                fSDataInputStream = fileSystem2.open(path);
                try {
                    byte[] bArr2 = new byte[100];
                    fSDataInputStream.seek(2000L);
                    fSDataInputStream.readFully(bArr2, 0, bArr2.length);
                    Assert.fail("shouldn't be able to read from corrupt 0-length block file.");
                } catch (IOException e3) {
                    DFSClient.LOG.error("caught exception ", e3);
                }
                fSDataInputStream.close();
                FSDataInputStream open2 = fileSystem2.open(path2);
                byte[] bArr3 = new byte[bArr.length];
                open2.readFully(bArr3, 0, bArr3.length);
                TestBlockReaderLocal.assertArrayRegionsEqual(bArr, 0, bArr3, 0, bArr.length);
                open2.close();
                FSDataInputStream fSDataInputStream2 = null;
                if (0 != 0) {
                    fSDataInputStream2.close();
                }
                if (miniDFSCluster != null) {
                    miniDFSCluster.shutdown();
                }
            } catch (Throwable th) {
                if (randomAccessFile != null) {
                    randomAccessFile.close();
                }
                throw th;
            }
        } catch (Throwable th2) {
            if (fSDataInputStream != null) {
                fSDataInputStream.close();
            }
            if (miniDFSCluster != null) {
                miniDFSCluster.shutdown();
            }
            throw th2;
        }
    }

    public static void main(String[] strArr) throws Exception {
        if (strArr.length != 3) {
            System.out.println("Usage: test shortcircuit checksum threadCount");
            System.exit(1);
        }
        boolean booleanValue = Boolean.valueOf(strArr[0]).booleanValue();
        boolean booleanValue2 = Boolean.valueOf(strArr[1]).booleanValue();
        int parseInt = Integer.parseInt(strArr[2]);
        final Configuration configuration = new Configuration();
        configuration.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, booleanValue);
        configuration.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, "/tmp/TestShortCircuitLocalRead._PORT");
        configuration.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, booleanValue2);
        final byte[] randomBytes = AppendTestUtil.randomBytes(seed, 5120100);
        final Path path = new Path("filelocal.dat");
        final FileSystem fileSystem = FileSystem.get(configuration);
        FSDataOutputStream createFile = createFile(fileSystem, path, 1);
        createFile.write(randomBytes);
        createFile.close();
        long now = Time.now();
        Thread[] threadArr = new Thread[parseInt];
        for (int i = 0; i < parseInt; i++) {
            threadArr[i] = new Thread() { // from class: org.apache.hadoop.hdfs.shortcircuit.TestShortCircuitLocalRead.2
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    for (int i2 = 0; i2 < 20; i2++) {
                        try {
                            TestShortCircuitLocalRead.checkFileContent(FileSystem.this.getUri(), path, randomBytes, 0, TestShortCircuitLocalRead.access$000(), configuration, true);
                        } catch (IOException e) {
                            e.printStackTrace();
                        } catch (InterruptedException e2) {
                            e2.printStackTrace();
                        }
                    }
                }
            };
        }
        for (int i2 = 0; i2 < parseInt; i2++) {
            threadArr[i2].start();
        }
        for (int i3 = 0; i3 < parseInt; i3++) {
            threadArr[i3].join();
        }
        System.out.println("Iteration 20 took " + (Time.now() - now));
        fileSystem.delete(path, false);
    }

    @Test(timeout = 60000)
    public void testReadWithRemoteBlockReader() throws IOException, InterruptedException {
        doTestShortCircuitReadWithRemoteBlockReader(true, 15460, getCurrentUser(), 0, false);
    }

    public void doTestShortCircuitReadWithRemoteBlockReader(boolean z, int i, String str, int i2, boolean z2) throws IOException, InterruptedException {
        Configuration configuration = new Configuration();
        configuration.setBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER, true);
        configuration.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
        MiniDFSCluster build = new MiniDFSCluster.Builder(configuration).numDataNodes(1).format(true).build();
        DistributedFileSystem fileSystem = build.getFileSystem();
        Path path = new Path("/");
        URI uri = build.getURI();
        Assert.assertTrue("/ should be a directory", fileSystem.getFileStatus(path).isDirectory());
        byte[] randomBytes = AppendTestUtil.randomBytes(seed, i);
        Path path2 = new Path("filelocal.dat");
        FSDataOutputStream createFile = createFile(fileSystem, path2, 1);
        createFile.write(randomBytes);
        createFile.close();
        try {
            try {
                checkFileContent(uri, path2, randomBytes, i2, str, configuration, z2);
                Assert.assertTrue("RemoteBlockReader unsupported method read(ByteBuffer bf) error", checkUnsupportedMethod(fileSystem, path2, randomBytes, i2));
                fileSystem.close();
                build.shutdown();
            } catch (IOException e) {
                throw new IOException("doTestShortCircuitReadWithRemoteBlockReader ex error ", e);
            } catch (InterruptedException e2) {
                throw e2;
            }
        } catch (Throwable th) {
            fileSystem.close();
            build.shutdown();
            throw th;
        }
    }

    private boolean checkUnsupportedMethod(FileSystem fileSystem, Path path, byte[] bArr, int i) throws IOException {
        HdfsDataInputStream hdfsDataInputStream = (HdfsDataInputStream) fileSystem.open(path);
        ByteBuffer allocateDirect = ByteBuffer.allocateDirect(bArr.length - i);
        IOUtils.skipFully(hdfsDataInputStream, i);
        try {
            hdfsDataInputStream.read(allocateDirect);
            return false;
        } catch (UnsupportedOperationException e) {
            return true;
        }
    }

    static /* synthetic */ String access$000() throws IOException {
        return getCurrentUser();
    }
}
