Hadoop HDFS的Java API使用

忘是亡心i 2022-02-24 04:06 408阅读 0赞

前言

在前面的章节中Hadoop Shell 命令 与 WordCount. 我们介绍了经常使用的HDFS经常使用的Shell命令. 本章我们讲解下 Hadoop的HDFS Java API.

本文相关代码, 可在我的Github项目 https://github.com/SeanYanxml/bigdata/ 目录下可以找到. PS: (如果觉得项目不错, 可以给我一个Star.)


正文 - 简单示例(上传/下载)

  • Pom.xml(导入需要的Jar包)

使用Java操作HDFS之前, 我们需要先导入相应的Jar包. 如果你使用过Maven, 那么操作将会非常简单. (如果你没有使用过Maven, 那么手动将HadoopClient依赖的包下载到本地, 然后导入.) 注意, Jar包版本尽量要高于你Hadoop安装的版本.

  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  2. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  3. <modelVersion>4.0.0</modelVersion>
  4. <parent>
  5. <groupId>com.yanxml</groupId>
  6. <artifactId>bigdata</artifactId>
  7. <version>0.0.1-SNAPSHOT</version>
  8. </parent>
  9. <artifactId>hadoop</artifactId>
  10. <dependencies>
  11. <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
  12. <dependency>
  13. <groupId>org.apache.hadoop</groupId>
  14. <artifactId>hadoop-client</artifactId>
  15. <version>2.7.5</version>
  16. </dependency>
  17. <!-- https://mvnrepository.com/artifact/junit/junit -->
  18. <dependency>
  19. <groupId>junit</groupId>
  20. <artifactId>junit</artifactId>
  21. <version>4.12</version>
  22. <scope>test</scope>
  23. </dependency>
  24. </dependencies>
  25. </project>
  • HdfsClientDemo.java
    下方的例子中主要包括3个部分.(1. 获取FS 2. 上传 3. 下载)

    package com.yanxml.bigdata.hadoop.hdfs.simple;

    import java.io.IOException;
    import java.net.URI;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.junit.Before;
    import org.junit.Test;

    /**

    • Hadoop HDFS 的Java API.
    • 客户端去操作Hdfs时, 是有一个用户身份的
    • 默认情况下, hdfs客户端API会从jvm中获取一个参数作为自己的身份, -DHADOOP_USER_NAME=hadoop
    • 也可以咋构造客户端fs对象时, 通过参数传递进去.
    • */
      public class HdfsClientDemo {
      FileSystem fs = null;

      @Before
      public void init(){

      1. Configuration conf = new Configuration();

      // conf.set(“fs.defaultFS”, “hdfs://localhost:9000”);

      1. // 拿到一个文件操作系统的客户端实例对象
      2. try {

      // fs = FileSystem.get(conf);

      1. // 直接传入URL 与用户证明
      2. fs = FileSystem.get(new URI("hdfs://localhost:9000"), conf, "Sean");
      3. } catch (Exception e) {
      4. e.printStackTrace();
      5. }

      }

      // 测试上传
      @Test
      public void testUpload(){

      1. try {

      // Thread.sleep(1000000);

      1. fs.copyFromLocalFile(new Path("/Users/Sean/Desktop/hello.sh"), new Path("/"+System.currentTimeMillis()));
      2. } catch (IllegalArgumentException e) {
      3. e.printStackTrace();
      4. } catch (IOException e) {
      5. e.printStackTrace();
      6. }catch (Exception e) {
      7. e.printStackTrace();
      8. }finally {
      9. try {
      10. fs.close();
      11. } catch (IOException e) {
      12. e.printStackTrace();
      13. }
      14. }

      }

      // 测试下载
      @Test
      public void testDownLoad(){

      1. try {
      2. fs.copyToLocalFile(new Path("/hello2019.sh"), new Path("/Users/Sean/Desktop/"));
      3. fs.close();
      4. } catch (IllegalArgumentException e) {
      5. e.printStackTrace();
      6. } catch (IOException e) {
      7. e.printStackTrace();
      8. }

      }
      }

文件系统的使用类型.
在这里插入图片描述

  • 配置用户
    配置方法1 (JVM运行时传递)
    在这里插入图片描述
    配置参数2

    fs = FileSystem.get(new URI(“hdfs://localhost:9000”), conf, “Sean”);

  • Tips
    Tips1 配置用户后 使用jconsole链接查看相关参数.
    在这里插入图片描述
    Tips2 配置用户后 使用jvisualvm链接查看相关参数.
    在这里插入图片描述
    Tip3 没有权限异常(按照上方配置更改后, 成功)

    org.apache.hadoop.security.AccessControlException: Permission denied: user=hadoop, access=WRITE, inode=”/“:Sean:supergroup:drwxr-xr-x

    1. at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:308)
    2. at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:214)
    3. at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:190)
    4. at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1752)
    5. at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1736)
    6. at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:1719)
    7. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2536)
    8. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2471)
    9. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2355)
    10. at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:624)
    11. at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:398)
    12. at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    13. at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
    14. at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
    15. at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217)
    16. at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2213)
    17. at java.security.AccessController.doPrivileged(Native Method)
    18. at javax.security.auth.Subject.doAs(Subject.java:422)
    19. at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
    20. at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2213)
    21. at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    22. at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
    23. at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    24. at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
    25. at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
    26. at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
    27. at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1841)
    28. at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1698)
    29. at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1633)
    30. at org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:448)
    31. at org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:444)
    32. at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    33. at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:459)
    34. at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:387)
    35. at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
    36. at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:891)
    37. at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:788)
    38. at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:365)
    39. at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:338)
    40. at org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:1968)
    41. at org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:1936)
    42. at org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:1901)
    43. at com.yanxml.bigdata.hadoop.hdfs.simple.HdfsClientDemo.testUpload(HdfsClientDemo.java:43)
    44. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    45. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    46. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    47. at java.lang.reflect.Method.invoke(Method.java:606)
    48. at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    49. at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    50. at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    51. at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    52. at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
    53. at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    54. at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
    55. at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
    56. at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    57. at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    58. at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    59. at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    60. at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    61. at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    62. at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
    63. at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
    64. at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
    65. at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:678)
    66. at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
    67. at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)

    Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=hadoop, access=WRITE, inode=”/“:Sean:supergroup:drwxr-xr-x

    1. at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:308)
    2. at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:214)
    3. at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:190)
    4. at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1752)
    5. at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1736)
    6. at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:1719)
    7. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2536)
    8. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2471)
    9. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2355)
    10. at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:624)
    11. at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:398)
    12. at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    13. at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
    14. at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
    15. at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217)
    16. at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2213)
    17. at java.security.AccessController.doPrivileged(Native Method)
    18. at javax.security.auth.Subject.doAs(Subject.java:422)
    19. at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
    20. at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2213)
    21. at org.apache.hadoop.ipc.Client.call(Client.java:1476)
    22. at org.apache.hadoop.ipc.Client.call(Client.java:1413)
    23. at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
    24. at com.sun.proxy.$Proxy14.create(Unknown Source)
    25. at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:296)
    26. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    27. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    28. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    29. at java.lang.reflect.Method.invoke(Method.java:606)
    30. at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
    31. at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
    32. at com.sun.proxy.$Proxy15.create(Unknown Source)
    33. at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1836)
    34. ... 40 more

Tip4 - Windows系统可能需要配置相应的Path变量才能执行成功.


正文 - 基本API(常用API)

在上节内, 我们介绍了HadoopHDFS简单Java API的简单使用.

增删查改
  • 获取配置(初始化)

    public void init(){

    1. conf = new Configuration();
    2. // 拿到一个文件操作系统的客户端实例对象
    3. try {
    4. // 直接传入URL 与用户证明
    5. fs = FileSystem.get(new URI("hdfs://localhost:9000"), conf, "Sean");
    6. } catch (Exception e) {
    7. e.printStackTrace();
    8. }
    9. }
  • 创建目录

    // 递归删除

    1. @Test
    2. public void testMkdir() throws IllegalArgumentException, IOException{
    3. boolean flag = fs.mkdirs(new Path("/sean2019/aaa/bbb"));
    4. }
  • 创建文件(上传文件)

    // 上传文件

    1. @Test
    2. public void testUpload(){
    3. try {
    4. fs.copyFromLocalFile(new Path("/Users/Sean/Desktop/hello.sh"), new Path("/"+System.currentTimeMillis()));
    5. } catch (Exception e) {
    6. e.printStackTrace();
    7. }finally {
    8. try {
    9. fs.close();
    10. } catch (IOException e) {
    11. e.printStackTrace();
    12. }
    13. }
    14. }
  • 删除(目录/文件)

    // 递归删除

    1. @Test
    2. public void testDelete() throws IllegalArgumentException, IOException{
    3. // 递归删除
    4. boolean flag = fs.delete(new Path("/sean2019/aaa"),true);
    5. System.out.println(flag);
    6. }
  • 下载文件

    // 下载文件

    1. @Test
    2. public void testDownLoad(){
    3. try {
    4. fs.copyToLocalFile(new Path("/hello2019.sh"), new Path("/Users/Sean/Desktop/"));
    5. fs.close();
    6. } catch (IllegalArgumentException e) {
    7. e.printStackTrace();
    8. } catch (IOException e) {
    9. e.printStackTrace();
    10. }
    11. }
  • 查询目录 - 迭代器
    主要注意的是迭代器内, 还能获取到文件的块的信息.

    // 递归列出子文件夹下的所有文件

    1. // 目录(层级递归) - 返回迭代器
    2. @Test
    3. public void testLs() throws FileNotFoundException, IllegalArgumentException, IOException{
    4. RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
    5. while(listFiles.hasNext()){
    6. LocatedFileStatus file = listFiles.next();
    7. System.out.println("Name: "+file.getPath().getName());
    8. System.out.println("Permission: "+file.getPermission());
    9. System.out.println("BlockSize: "+file.getBlockSize());
    10. System.out.println("Owner: "+file.getOwner());
    11. System.out.println("Replication: "+file.getReplication());
    12. BlockLocation[] blockLocations = file.getBlockLocations();
    13. for(BlockLocation blockLocation: blockLocations){
    14. // 偏移量
    15. System.out.println("Block "+blockLocation.getOffset());
    16. System.out.println("Block Length: "+blockLocation.getLength());
    17. for(String data:blockLocation.getHosts()){
    18. System.out.println("Block Host:"+data);
    19. }
    20. }
    21. }
    22. }
  • 查询目录 - 数组(单层目录)

    @Test

    1. public void testLs2() throws FileNotFoundException, IllegalArgumentException, IOException{
    2. FileStatus[] fileStatus= fs.listStatus(new Path("/"));
    3. for(FileStatus file:fileStatus){
    4. System.out.println("Name:" + file.getPath().getName());
    5. System.out.println(file.isFile()?"file":"directory");
    6. }
    7. }
流模式(上传/下载/指定长度)
  • 流模式上传

    /**

    1. * 通过流的方式 上传文件到Hdfs.
    2. *
    3. * */
    4. @Test
    5. public void testUpload() throws IllegalArgumentException, IOException{
    6. // HDFS输出流
    7. FSDataOutputStream outputStream = fs.create(new Path("/sean2019/hellok.sh"),true);
    8. // 获取本地输入流
    9. FileInputStream localInputStream = new FileInputStream("/Users/Sean/Desktop/hello.sh");
    10. IOUtils.copy(localInputStream, outputStream);
    11. }
  • 流模式下载

    /**

    1. * 通过流的方式 下载文件.
    2. * @throws IOException
    3. * @throws IllegalArgumentException
    4. * */

    // @Test

    1. public void testWrite() throws IllegalArgumentException, IOException{
    2. FSDataInputStream inputStream = fs.open(new Path("/sean2019/hellok.sh"));
    3. FileOutputStream localOutPutStream = new FileOutputStream("/Users/Sean/Desktop/helloG.sh");
    4. IOUtils.copy(inputStream, localOutPutStream);
    5. }
  • 流模式指定长度
    使用inputStream.seek(xxx)方法可以指定文件读取的开始长度. 使用IOUtils.copyLarge()方法可以指定开始和结束长度.
    在这里插入图片描述

    /**

    1. * 指定文件流.
    2. * @throws IOException
    3. * @throws IllegalArgumentException
    4. * */
    5. @Test
    6. public void testRandomAccess() throws IllegalArgumentException, IOException{
    7. FSDataInputStream inputStream = fs.open(new Path("/sean2019/hellok.sh"));
    8. // 通过Seek方法指定长度
    9. inputStream.seek(12);
    10. FileOutputStream localOutPutStream = new FileOutputStream("/Users/Sean/Desktop/helloG.sh.part2");
    11. IOUtils.copy(inputStream, localOutPutStream);
    12. }
  • 输出到窗口(System.out流)

    /**

    1. * 显示文件流到屏幕上
    2. * @throws IOException
    3. * @throws IllegalArgumentException
    4. * */
    5. @Test
    6. public void testConsole() throws IllegalArgumentException, IOException{
    7. FSDataInputStream inputStream = fs.open(new Path("/sean2019/hellok.sh"));
    8. IOUtils.copy(inputStream, System.out);
    9. }

    // 读取指定长度伪代码
    //inputStream.seek(60M);
    //while(read){
    // count++;
    // if(count>60M)break;
    //}


Reference

[1].

发表评论

表情:
评论列表 (有 0 条评论,408人围观)

还没有评论,来说两句吧...

相关阅读

    相关 HDFSJAVA API使用

     HDFS是一个分布式文件系统,既然是文件系统,就可以对其文件进行操作,比如说新建文件、删除文件、读取文件内容等操作。下面记录一下使用JAVA API对HDFS中的文件进行操作