hadoop—使用java以编程方式读取存储在hdfs中的文本文件的内容

5hcedyr0  于 2021-06-03  发布在  Hadoop
关注(0)|答案(4)|浏览(315)

如何运行这个简单的java程序从hdfs中的directory/words中存储的文本文件中读取字节?我需要为此创建一个jar文件吗?

  1. import java.io.*;
  2. import java.net.MalformedURLException;
  3. import java.net.URL;
  4. import org.apache.hadoop.*;
  5. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  6. public class filesystemhdfs
  7. {
  8. public static void main(String args[]) throws MalformedURLException, IOException
  9. {
  10. byte[] b=null;
  11. InputStream in=null;
  12. in=new URL("hdfs://localhost/words/file").openStream();
  13. in.read(b);
  14. System.out.println(""+b);
  15. for(int i=0;i<b.length;i++)
  16. {
  17. System.out.println("b[i]=%d"+b[i]);
  18. System.out.println(""+(char)b[i]);
  19. }
  20. }
  21. }
axkjgtzd

axkjgtzd1#

您可以使用hdfs api,这可以从本地运行:

  1. Configuration configuration = new Configuration();
  2. configuration.set("fs.defaultFS", "hdfs://namenode:8020");
  3. FileSystem fs = FileSystem.get(configuration);
  4. Path filePath = new Path(
  5. "hdfs://namenode:8020/PATH");
  6. FSDataInputStream fsDataInputStream = fs.open(filePath);
2fjabf4q

2fjabf4q2#

首先,您需要告诉jvm url对象中的hdfs方案。通过以下方式完成:

  1. URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());

编译java类后,需要使用hadoop命令:

  1. hadoop filesystemhdfs

hadoop附带了一个方便的ioutils。这会让你轻松很多。

8ulbf1ek

8ulbf1ek3#

现在回信有点晚,但对将来的读者有帮助。它将迭代hdfs目录并读取每个文件的内容。
只使用hadoop客户端和java。

  1. Configuration conf = new Configuration();
  2. conf.addResource(new Path(“/your/hadoop/conf/core-site.xml"));
  3. conf.addResource(new Path("/your/hadoop/confhdfs-site.xml"));
  4. FileSystem fs = FileSystem.get(conf);
  5. FileStatus[] status = fs.listStatus(new Path("hdfs://path/to/your/hdfs/directory”);
  6. for (int i = 0; i < status.length; i++) {
  7. FSDataInputStream inputStream = fs.open(status[i].getPath());
  8. String content = IOUtils.toString(inputStream, "UTF-8");
  9. }
hk8txs48

hk8txs484#

您不能从hdfs读取文件,因为java支持常规文件系统。你需要使用 HDFS java AP 我为这个感到高兴。

  1. public static void main(String a[]) {
  2. UserGroupInformation ugi
  3. = UserGroupInformation.createRemoteUser("root");
  4. try {
  5. ugi.doAs(new PrivilegedExceptionAction<Void>() {
  6. public Void run() throws Exception {
  7. Configuration conf = new Configuration();
  8. //fs.default.name should match the corresponding value
  9. // in your core-site.xml in hadoop cluster
  10. conf.set("fs.default.name","hdfs://hostname:9000");
  11. conf.set("hadoop.job.ugi", "root");
  12. readFile("words/file",conf)
  13. return null;
  14. }
  15. });
  16. } catch (Exception e) {
  17. e.printStackTrace();
  18. }
  19. }
  20. public static void readFile(String file,Configuration conf) throws IOException {
  21. FileSystem fileSystem = FileSystem.get(conf);
  22. Path path = new Path(file);
  23. if (!ifExists(path)) {
  24. System.out.println("File " + file + " does not exists");
  25. return;
  26. }
  27. FSDataInputStream in = fileSystem.open(path);
  28. BufferedReader br = new BufferedReader(new InputStreamReader(in));
  29. String line = null;
  30. while((line = br.readLine())!= null){
  31. System.out.println(line);
  32. }
  33. in.close();
  34. br.close();
  35. fileSystem.close();
  36. }
  37. public static boolean ifExists(Path source) throws IOException {
  38. FileSystem hdfs = FileSystem.get(conf);
  39. boolean isExists = hdfs.exists(source);
  40. System.out.println(isExists);
  41. return isExists;
  42. }

在这里,我试图从一个远程机器,这就是为什么我使用 UserGroupInformation 并用的run方法编写代码 PrivilegedExceptionAction . 如果您在本地系统中,您可能不需要它。嗯!

展开查看全部

相关问题