使用loadincrementalhfiles和子目录进行批量加载

dy1byipe  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(555)

我编写了一个spark应用程序,它生成用于批量加载的hfiles LoadIncrementalHFiles 稍后命令。由于源数据池非常大,输入文件被拆分为迭代,然后依次进行处理。每次迭代都会创建自己的 HFile 目录,因此我的hdfs结构如下所示:

  1. /user/myuser/map_data/hfiles_0
  2. ... /hfiles_1
  3. ... /hfiles_2
  4. ... /hfiles_3
  5. ...

这里面大约有500个文件 map_data 目录,因此我正在寻找一种方法来自动调用 LoadIncrementalHFiles 函数,以便在以后的迭代中也处理这些子目录。
相应的命令如下:

  1. hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles -Dcreate.table=no /user/myuser/map_data/hfiles_0 mytable

我需要将其更改为一个迭代命令,因为这个命令不适用于子目录(当我用 /user/myuser/map_data 目录)!
我试着用java Process 示例来自动执行上面的命令,但这并没有起到任何作用(没有输出到控制台,也没有hbase表中的更多行)。
使用 org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles 我的代码中的java类也不起作用,它也没有响应!
有人给我举个好例子吗?或者是否有一个参数可以运行上述操作 hbase 父目录上的命令?我正在hortonworks数据平台2.5集群中使用hbase 1.1.2。
编辑我试着运行 LoadIncrementalHFiles 来自hadoop客户机java应用程序的命令,但我得到了一个与snappy压缩相关的异常,请参阅从java客户机运行loadincrementalhfiles

3zwtqj6y

3zwtqj6y1#

解决的办法是将 hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles -Dcreate.table=no /user/myuser/map_data/hfiles_0 mytable 命令分为多个部分(每个命令部分一个),请参见以下java代码段:

  1. TreeSet<String> subDirs = getHFileDirectories(new Path(HDFS_PATH), hadoopConf);
  2. for(String hFileDir : subDirs) {
  3. try {
  4. String pathToReadFrom = HDFS_OUTPUT_PATH + "/" + hFileDir;
  5. ==> String[] execCode = {"hbase", "org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles", "-Dcreate.table=no", pathToReadFrom, hbaseTableName};
  6. ProcessBuilder pb = new ProcessBuilder(execCode);
  7. pb.redirectErrorStream(true);
  8. final Process p = pb.start();
  9. // Write the output of the Process to the console
  10. new Thread(new Runnable() {
  11. public void run() {
  12. BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));
  13. String line = null;
  14. try {
  15. while ((line = input.readLine()) != null)
  16. System.out.println(line);
  17. } catch (IOException e) {
  18. e.printStackTrace();
  19. }
  20. }
  21. }).start();
  22. // Wait for the end of the execution
  23. p.waitFor();
  24. ...
  25. }
展开查看全部

相关问题