HDFS API操作

x33g5p2x  于2020-09-30 发布在 HDFS  
字(10.5k)|赞(0)|评价(0)|浏览(742)

1.maven准备

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.hadoop</groupId>
  4. <artifactId>hadoop-common</artifactId>
  5. <version>2.8.4</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.hadoop</groupId>
  9. <artifactId>hadoop-common</artifactId>
  10. <version>2.8.4</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.apache.hadoop</groupId>
  14. <artifactId>hadoop-hdfs</artifactId>
  15. <version>2.8.4</version>
  16. </dependency>
  17. <dependency>
  18. <groupId>org.apache.hadoop</groupId>
  19. <artifactId>hadoop-client</artifactId>
  20. <version>2.8.4</version>
  21. </dependency>
  22. <dependency>
  23. <groupId>org.projectlombok</groupId>
  24. <artifactId>lombok</artifactId>
  25. <version>1.16.10</version>
  26. </dependency>
  27. <dependency>
  28. <groupId>log4j</groupId>
  29. <artifactId>log4j</artifactId>
  30. <version>1.2.17</version>
  31. </dependency>
  32. <dependency>
  33. <groupId>org.slf4j</groupId>
  34. <artifactId>slf4j-api</artifactId>
  35. <version>1.7.7</version>
  36. </dependency>
  37. <!-- https://mvnrepository.com/artifact/junit/junit -->
  38. <dependency>
  39. <groupId>junit</groupId>
  40. <artifactId>junit</artifactId>
  41. <version>4.12</version>
  42. <scope>test</scope>
  43. </dependency>
  44. </dependencies>

2.本地配置Hadoop环境

在windows系统需要配置hadoop运行环境,否则直接运行代码会出现以下问题:

  1. 解压hadoop2.8.4文件至需要安装的全英文目录下

  2. 配置hadoop的环境变量 HADOOP_HOME

    将以下配置到path变量中

  1. D:\hadoop-2.8.4\hadoop-2.8.4\sbin
  2. D:\hadoop-2.8.4\hadoop-2.8.4\bin

注:按照自己的安装路径进行配置

3.获取文件系统

  1. /**
  2. * 打印本地hadoop地址值
  3. * IO的方式写代码
  4. */
  5. @Test
  6. public void intiHDFS() throws IOException {
  7. //F2 可以快速的定位错误
  8. // alt + enter自动找错误
  9. //1.创建配信信息对象 ctrl + alt + v 后推前 ctrl + shitl + enter 补全
  10. Configuration conf = new Configuration();
  11. //2.获取文件系统
  12. FileSystem fs = FileSystem.get(conf);
  13. //3.打印文件系统
  14. System.out.println(fs.toString());
  15. }

4.遍历HDFS所有文件

  1. //遍历HDFS所有文件
  2. @Test
  3. public void listFiles() throws Exception{
  4. //1.获取filesystem对象
  5. FileSystem fileSystem = FileSystem.get(new URI("hdfs://bigdata111:9000"), new Configuration());
  6. //2.获取RemoteIterator 得到所有的文件或者文件夹,第一个参数指定遍历的路径,第二个can阿叔表示是否要递归遍历
  7. RemoteIterator<LocatedFileStatus> lists = fileSystem.listFiles(new Path("/"), true);
  8. //3.循环
  9. while (lists.hasNext()){
  10. LocatedFileStatus next = lists.next();
  11. System.out.println(next.getPath().toString());
  12. //文件的block信息
  13. BlockLocation[] blockLocations = next.getBlockLocations();
  14. System.out.println("block数: "+blockLocations.length);
  15. }
  16. fileSystem.close();
  17. }

5.HDFS上创建文件夹

  1. /**
  2. * hadoop fs -mkdir /xinshou
  3. */
  4. @Test
  5. public void mkmdirHDFS() throws Exception {
  6. //1.创新配置信息对象
  7. Configuration configuration = new Configuration();
  8. //2.链接文件系统
  9. //final URI uri 地址
  10. //final Configuration conf 配置
  11. //String user Linux用户
  12. FileSystem fs = FileSystem.get(new URI("hdfs://bigdata111:9000"), configuration, "root");
  13. //3.创建目录
  14. fs.mkdirs(new Path("hdfs://bigdata111:9000/Good/Goog/Study"));
  15. //4.关闭
  16. fs.close();
  17. System.out.println("创建文件夹成功");
  18. }

6. 文件下载

  1. /**
  2. * hadoop fs -get /HDFS文件系统
  3. * @throws Exception
  4. */
  5. @Test
  6. public void getFileFromHDFS() throws Exception {
  7. //1.创建配置信息对象 Configuration:配置
  8. Configuration conf = new Configuration();
  9. //2.找到文件系统
  10. //final URI uri :HDFS地址
  11. //final Configuration conf:配置信息
  12. // String user :Linux用户名
  13. FileSystem fs = FileSystem.get(new URI("hdfs://bigdata111:9000"), conf, "root");
  14. //3.下载文件
  15. //boolean delSrc:是否将原文件删除
  16. //Path src :要下载的路径
  17. //Path dst :要下载到哪
  18. //boolean useRawLocalFileSystem :是否校验文件
  19. fs.copyToLocalFile(false,new Path("hdfs://bigdata111:9000/README.txt"),
  20. new Path("F:\\date\\README.txt"),true);
  21. //4.关闭fs
  22. //alt + enter 找错误
  23. //ctrl + alt + o 可以快速的去除没有用的导包
  24. fs.close();
  25. System.out.println("下载成功");
  26. }

7.文件上传

  1. /**
  2. * 上传代码
  3. * 注意:如果上传的内容大于128MB,则是2块
  4. */
  5. @Test
  6. public void putFileToHDFS() throws Exception {
  7. //注:import org.apache.hadoop.conf.Configuration;
  8. //ctrl + alt + v 推动出对象
  9. //1.创建配置信息对象
  10. Configuration conf = new Configuration();
  11. //2.设置部分参数
  12. conf.set("dfs.replication","2");
  13. //3.找到HDFS的地址
  14. FileSystem fs = FileSystem.get(new URI("hdfs://bigdata111:9000"), conf, "root");
  15. //4.上传本地Windows文件的路径
  16. Path src = new Path("D:\\hadoop-2.7.2.rar");
  17. //5.要上传到HDFS的路径
  18. Path dst = new Path("hdfs://bigdata111:9000/Andy");
  19. //6.以拷贝的方式上传,从src -> dst
  20. fs.copyFromLocalFile(src,dst);
  21. //7.关闭
  22. fs.close();
  23. System.out.println("上传成功");
  24. }

8.小文件合并至HDFS

  1. //小文件合并至HDFS
  2. @Test
  3. public void mergeFile() throws Exception{
  4. //1.获取FileSystem系统
  5. FileSystem fileSystem = FileSystem.get(new URI("hdfs://bigdata111:9000"),new Configuration());
  6. //2.获取hdfs大文件输出流
  7. FSDataOutputStream OutputStream = fileSystem.create(new Path("/hello.txt"));
  8. //3.获取一个本地文件系统
  9. LocalFileSystem local = FileSystem.getLocal(new Configuration());
  10. //4.获取本地文件夹下所有文件的详情
  11. FileStatus[] fileStatuses = local.listStatus(new Path("G:\\file"));
  12. //5.遍历每个文件,获取每个文件的输入流
  13. for (FileStatus fileStatus : fileStatuses) {
  14. FSDataInputStream open = local.open(fileStatus.getPath());
  15. //6.将小文件复制到大文件
  16. IOUtils.copy(open,OutputStream);
  17. //7.关闭流
  18. IOUtils.closeQuietly(open);
  19. }
  20. IOUtils.closeQuietly(OutputStream);
  21. local.close();
  22. fileSystem.close();
  23. }

9.HDFS文件夹删除

  1. /**
  2. * hadoop fs -rm -r /文件
  3. */
  4. @Test
  5. public void deleteHDFS() throws Exception {
  6. //1.创建配置对象
  7. Configuration conf = new Configuration();
  8. //2.链接文件系统
  9. //final URI uri, final Configuration conf, String user
  10. //final URI uri 地址
  11. //final Configuration conf 配置
  12. //String user Linux用户
  13. FileSystem fs = FileSystem.get(new URI("hdfs://bigdata111:9000"), conf, "root");
  14. //3.删除文件
  15. //Path var1 : HDFS地址
  16. //boolean var2 : 是否递归删除
  17. fs.delete(new Path("hdfs://bigdata111:9000/a"),false);
  18. //4.关闭
  19. fs.close();
  20. System.out.println("删除成功啦");
  21. }

10.HDFS文件名更改

  1. @Test
  2. public void renameAtHDFS() throws Exception{
  3. // 1 创建配置信息对象
  4. Configuration configuration = new Configuration();
  5. FileSystem fs = FileSystem.get(new URI("hdfs://bigdata111:9000"),configuration, "itstar");
  6. //2 重命名文件或文件夹
  7. fs.rename(new Path("hdfs://bigdata111:9000/user/itstar/hello.txt"), new Path("hdfs://bigdata111:9000/user/itstar/hellonihao.txt"));
  8. fs.close();
  9. }

11.HDFS文件详情查看

  1. /**
  2. * 查看【文件】名称、权限等
  3. */
  4. @Test
  5. public void readListFiles() throws Exception {
  6. //1.创建配置对象
  7. Configuration conf = new Configuration();
  8. //2.链接文件系统
  9. FileSystem fs = FileSystem.get(new URI("hdfs://bigdata111:9000"), conf, "root");
  10. //3.迭代器
  11. RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
  12. //4.遍历迭代器
  13. while (listFiles.hasNext()){
  14. //一个一个出
  15. LocatedFileStatus fileStatus = listFiles.next();
  16. //名字
  17. System.out.println("文件名:" + fileStatus.getPath().getName());
  18. //块大小
  19. System.out.println("大小:" + fileStatus.getBlockSize());
  20. //权限
  21. System.out.println("权限:" + fileStatus.getPermission());
  22. System.out.println(fileStatus.getLen());
  23. BlockLocation[] locations = fileStatus.getBlockLocations();
  24. for (BlockLocation bl:locations){
  25. System.out.println("block-offset:" + bl.getOffset());
  26. String[] hosts = bl.getHosts();
  27. for (String host:hosts){
  28. System.out.println(host);
  29. }
  30. }
  31. System.out.println("------------------华丽的分割线----------------");
  32. }

12.HDFS文件和文件夹判断

  1. /**
  2. * 判断是否是个文件还是目录,然后打印
  3. * @throws Exception
  4. */
  5. @Test
  6. public void judge() throws Exception {
  7. //1.创建配置文件信息
  8. Configuration conf = new Configuration();
  9. //2.获取文件系统
  10. FileSystem fs = FileSystem.get(new URI("hdfs://bigdata111:9000"), conf, "root");
  11. //3.遍历所有的文件
  12. FileStatus[] liststatus = fs.listStatus(new Path("/Andy"));
  13. for(FileStatus status :liststatus)
  14. {
  15. //判断是否是文件
  16. if (status.isFile()){
  17. //ctrl + d:复制一行
  18. //ctrl + x 是剪切一行,可以用来当作是删除一行
  19. System.out.println("文件:" + status.getPath().getName());
  20. } else {
  21. System.out.println("目录:" + status.getPath().getName());
  22. }
  23. }
  24. }

13.通过IO流操作HDFS

13.1HDFS文件上传

  1. /**
  2. * IO流方式上传
  3. *
  4. * @throws URISyntaxException
  5. * @throws FileNotFoundException
  6. * @throws InterruptedException
  7. */
  8. @Test
  9. public void putFileToHDFSIO() throws URISyntaxException, IOException, InterruptedException {
  10. //1.创建配置文件信息
  11. Configuration conf = new Configuration();
  12. //2.获取文件系统
  13. FileSystem fs = FileSystem.get(new URI("hdfs://bigdata111:9000"), conf, "root");
  14. //3.创建输入流
  15. FileInputStream fis = new FileInputStream(new File("F:\\date\\Sogou.txt"));
  16. //4.输出路径
  17. //注意:不能/Andy 记得后边写个名 比如:/Andy/Sogou.txt
  18. Path writePath = new Path("hdfs://bigdata111:9000/Andy/Sogou.txt");
  19. FSDataOutputStream fos = fs.create(writePath);
  20. //5.流对接
  21. //InputStream in 输入
  22. //OutputStream out 输出
  23. //int buffSize 缓冲区
  24. //boolean close 是否关闭流
  25. try {
  26. IOUtils.copyBytes(fis,fos,4 * 1024,false);
  27. } catch (IOException e) {
  28. e.printStackTrace();
  29. }finally {
  30. IOUtils.closeStream(fos);
  31. IOUtils.closeStream(fis);
  32. System.out.println("上传成功啦");
  33. }
  34. }

13.2 HDFS文件下载

  1. /**
  2. * IO读取HDFS到控制台
  3. *
  4. * @throws URISyntaxException
  5. * @throws IOException
  6. * @throws InterruptedException
  7. */
  8. @Test
  9. public void getFileToHDFSIO() throws URISyntaxException, IOException, InterruptedException {
  10. //1.创建配置文件信息
  11. Configuration conf = new Configuration();
  12. //2.获取文件系统
  13. FileSystem fs = FileSystem.get(new URI("hdfs://bigdata111:9000"), conf, "root");
  14. //3.读取路径
  15. Path readPath = new Path("hdfs://bigdata111:9000/Andy/Sogou.txt");
  16. //4.输入
  17. FSDataInputStream fis = fs.open(readPath);
  18. //5.输出到控制台
  19. //InputStream in 输入
  20. //OutputStream out 输出
  21. //int buffSize 缓冲区
  22. //boolean close 是否关闭流
  23. IOUtils.copyBytes(fis,System.out,4 * 1024 ,true);
  24. }

13.3 定位文件读取

  1. 下载第一块
  1. /**
  2. * IO读取第一块的内容
  3. *
  4. * @throws Exception
  5. */
  6. @Test
  7. public void readFlieSeek1() throws Exception {
  8. //1.创建配置文件信息
  9. Configuration conf = new Configuration();
  10. //2.获取文件系统
  11. FileSystem fs = FileSystem.get(new URI("hdfs://bigdata111:9000"), conf, "root");
  12. //3.输入
  13. Path path = new Path("hdfs://bigdata111:9000/Andy/hadoop-2.7.2.rar");
  14. FSDataInputStream fis = fs.open(path);
  15. //4.输出
  16. FileOutputStream fos = new FileOutputStream("F:\\date\\readFileSeek\\A1");
  17. //5.流对接
  18. byte[] buf = new byte[1024];
  19. for (int i = 0; i < 128 * 1024; i++) {
  20. fis.read(buf);
  21. fos.write(buf);
  22. }
  23. //6.关闭流
  24. IOUtils.closeStream(fos);
  25. IOUtils.closeStream(fis);
  26. }
  1. 下载第二块
  1. /**
  2. * IO读取第二块的内容
  3. *
  4. * @throws Exception
  5. */
  6. @Test
  7. public void readFlieSeek2() throws Exception {
  8. //1.创建配置文件信息
  9. Configuration conf = new Configuration();
  10. //2.获取文件系统
  11. FileSystem fs = FileSystem.get(new URI("hdfs://bigdata111:9000"), conf, "root");
  12. //3.输入
  13. Path path = new Path("hdfs://bigdata111:9000/Andy/hadoop-2.7.2.rar");
  14. FSDataInputStream fis = fs.open(path);
  15. //4.输出
  16. FileOutputStream fos = new FileOutputStream("F:\\date\\readFileSeek\\A2");
  17. //5.定位偏移量/offset/游标/读取进度 (目的:找到第一块的尾巴,第二块的开头)
  18. fis.seek(128 * 1024 * 1024);
  19. //6.流对接
  20. IOUtils.copyBytes(fis, fos, 1024);
  21. //7.关闭流
  22. IOUtils.closeStream(fos);
  23. IOUtils.closeStream(fis);
  24. }
  1. 合并文件

    在window命令窗口中执行

    type A2 >> A1 然后更改后缀为rar即可

14.一致性模型

  1. @Test
  2. public void writeFile() throws Exception{
  3. // 1 创建配置信息对象
  4. Configuration configuration = new Configuration();
  5. fs = FileSystem.get(configuration);
  6. // 2 创建文件输出流
  7. Path path = new Path("F:\\date\\H.txt");
  8. FSDataOutputStream fos = fs.create(path);
  9. // 3 写数据
  10. fos.write("hello Andy".getBytes());
  11. // 4 一致性刷新
  12. fos.hflush();
  13. fos.close();
  14. }

总结:

写入数据时,如果希望数据被其他client立即可见,调用如下方法

FSDataOutputStream. hflush (); //清理客户端缓冲区数据,被其他client立即可见

相关文章