在spark中使用自定义hadoop输入格式处理二进制文件

mitkmikd  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(463)

我开发了一个基于hadoop的解决方案,可以处理二进制文件。这使用了经典的hadoop mr技术。二进制文件大约为10gb,分为73个hdfs块,而作为map进程编写的业务逻辑在这73个块中的每一个块上运行。我们在hadoop中开发了custominputformat和customrecordreader,它向map函数返回key(intwritable)和value(byteswritable)。该值只是hdfs块的内容(二进制数据)。业务逻辑知道如何读取这些数据。
现在,我想把这个代码移植到spark中。我是spark的初学者,可以在spark中运行简单的示例(wordcount、pi示例)。然而,在spark中处理二进制文件并不是一个简单的例子。我看到这个用例有两种解决方案。首先,避免使用自定义输入格式和记录读取器。在spark中找到一个方法(approach),为那些hdfs块创建一个rdd,使用一个类似map的方法将hdfs块内容提供给业务逻辑。如果这是不可能的,我想重新使用自定义输入格式和自定义阅读器使用一些方法,如hadoopapi,hadooprdd等problem:- i 不知道第一种方法是否可行。如果可能的话,谁能提供一些包含示例的指针?我尝试第二种方法,但非常失败。下面是我使用的代码片段

  1. package org {
  2. object Driver {
  3. def myFunc(key : IntWritable, content : BytesWritable):Int = {
  4. println(key.get())
  5. println(content.getSize())
  6. return 1
  7. }
  8. def main(args: Array[String]) {
  9. // create a spark context
  10. val conf = new SparkConf().setAppName("Dummy").setMaster("spark://<host>:7077")
  11. val sc = new SparkContext(conf)
  12. println(sc)
  13. val rd = sc.newAPIHadoopFile("hdfs:///user/hadoop/myBin.dat", classOf[RandomAccessInputFormat], classOf[IntWritable], classOf[BytesWritable])
  14. val count = rd.map (x => myFunc(x._1, x._2)).reduce(_+_)
  15. println("The count is*****************************"+count)
  16. }
  17. }

}
请注意,main方法中的print语句打印73块,而map函数中的print语句打印0块。
有人能告诉我哪里做错了吗?我认为我使用api的方式不对,但没有找到一些文档/使用示例。

nzkunb0c

nzkunb0c1#

我在这个问题上取得了一些进展。我现在使用下面的函数来完成这项工作

  1. var hRDD = new NewHadoopRDD(sc, classOf[RandomAccessInputFormat],
  2. classOf[IntWritable],
  3. classOf[BytesWritable],
  4. job.getConfiguration()
  5. )
  6. val count = hRDD.mapPartitionsWithInputSplit{ (split, iter) => myfuncPart(split, iter)}.collect()

但是,最后出现了另一个错误,我在这里发布了访问sparkmap函数内部hdfs文件的详细信息

  1. 15/10/30 11:11:39 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 40.221.94.235): java.io.IOException: No FileSystem for scheme: spark
  2. at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)
  3. at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
  4. at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
  5. at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
bzzcjhmw

bzzcjhmw2#

几个问题一目了然。你定义 myFunc 但是打电话 func . 你的 myFunc 没有返回类型,因此无法调用 collect() . 如果你的 myFunc 确实没有返回值,你可以这样做 foreach 而不是 map . collect() 将rdd中的数据拉入驱动程序,以允许您在本地(在驱动程序上)对其进行处理。

相关问题