在spark中使用自定义hadoop输入格式处理二进制文件

mitkmikd  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(429)

我开发了一个基于hadoop的解决方案,可以处理二进制文件。这使用了经典的hadoop mr技术。二进制文件大约为10gb,分为73个hdfs块,而作为map进程编写的业务逻辑在这73个块中的每一个块上运行。我们在hadoop中开发了custominputformat和customrecordreader,它向map函数返回key(intwritable)和value(byteswritable)。该值只是hdfs块的内容(二进制数据)。业务逻辑知道如何读取这些数据。
现在,我想把这个代码移植到spark中。我是spark的初学者,可以在spark中运行简单的示例(wordcount、pi示例)。然而,在spark中处理二进制文件并不是一个简单的例子。我看到这个用例有两种解决方案。首先,避免使用自定义输入格式和记录读取器。在spark中找到一个方法(approach),为那些hdfs块创建一个rdd,使用一个类似map的方法将hdfs块内容提供给业务逻辑。如果这是不可能的,我想重新使用自定义输入格式和自定义阅读器使用一些方法,如hadoopapi,hadooprdd等problem:- i 不知道第一种方法是否可行。如果可能的话,谁能提供一些包含示例的指针?我尝试第二种方法,但非常失败。下面是我使用的代码片段

package org {  
object Driver {      
  def myFunc(key : IntWritable, content : BytesWritable):Int = {      
    println(key.get())
    println(content.getSize())
    return 1       
  }    
  def main(args: Array[String]) {       
    // create a spark context
    val conf = new SparkConf().setAppName("Dummy").setMaster("spark://<host>:7077")
    val sc = new SparkContext(conf)    
    println(sc)   
    val rd = sc.newAPIHadoopFile("hdfs:///user/hadoop/myBin.dat", classOf[RandomAccessInputFormat], classOf[IntWritable], classOf[BytesWritable])  
    val count = rd.map (x => myFunc(x._1, x._2)).reduce(_+_)
    println("The count is*****************************"+count)
  }
}

}
请注意,main方法中的print语句打印73块,而map函数中的print语句打印0块。
有人能告诉我哪里做错了吗?我认为我使用api的方式不对,但没有找到一些文档/使用示例。

nzkunb0c

nzkunb0c1#

我在这个问题上取得了一些进展。我现在使用下面的函数来完成这项工作

var hRDD = new NewHadoopRDD(sc, classOf[RandomAccessInputFormat], 
        classOf[IntWritable], 
        classOf[BytesWritable],
        job.getConfiguration() 
        )    

val count = hRDD.mapPartitionsWithInputSplit{ (split, iter) => myfuncPart(split, iter)}.collect()

但是,最后出现了另一个错误,我在这里发布了访问sparkmap函数内部hdfs文件的详细信息

15/10/30 11:11:39 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 40.221.94.235): java.io.IOException: No FileSystem for scheme: spark
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
bzzcjhmw

bzzcjhmw2#

几个问题一目了然。你定义 myFunc 但是打电话 func . 你的 myFunc 没有返回类型,因此无法调用 collect() . 如果你的 myFunc 确实没有返回值,你可以这样做 foreach 而不是 map . collect() 将rdd中的数据拉入驱动程序,以允许您在本地(在驱动程序上)对其进行处理。

相关问题