我开发了一个基于hadoop的解决方案,可以处理二进制文件。这使用了经典的hadoop mr技术。二进制文件大约为10gb,分为73个hdfs块,而作为map进程编写的业务逻辑在这73个块中的每一个块上运行。我们在hadoop中开发了custominputformat和customrecordreader,它向map函数返回key(intwritable)和value(byteswritable)。该值只是hdfs块的内容(二进制数据)。业务逻辑知道如何读取这些数据。
现在,我想把这个代码移植到spark中。我是spark的初学者,可以在spark中运行简单的示例(wordcount、pi示例)。然而,在spark中处理二进制文件并不是一个简单的例子。我看到这个用例有两种解决方案。首先,避免使用自定义输入格式和记录读取器。在spark中找到一个方法(approach),为那些hdfs块创建一个rdd,使用一个类似map的方法将hdfs块内容提供给业务逻辑。如果这是不可能的,我想重新使用自定义输入格式和自定义阅读器使用一些方法,如hadoopapi,hadooprdd等problem:- i 不知道第一种方法是否可行。如果可能的话,谁能提供一些包含示例的指针?我尝试第二种方法,但非常失败。下面是我使用的代码片段
package org {
object Driver {
def myFunc(key : IntWritable, content : BytesWritable):Int = {
println(key.get())
println(content.getSize())
return 1
}
def main(args: Array[String]) {
// create a spark context
val conf = new SparkConf().setAppName("Dummy").setMaster("spark://<host>:7077")
val sc = new SparkContext(conf)
println(sc)
val rd = sc.newAPIHadoopFile("hdfs:///user/hadoop/myBin.dat", classOf[RandomAccessInputFormat], classOf[IntWritable], classOf[BytesWritable])
val count = rd.map (x => myFunc(x._1, x._2)).reduce(_+_)
println("The count is*****************************"+count)
}
}
}
请注意,main方法中的print语句打印73块,而map函数中的print语句打印0块。
有人能告诉我哪里做错了吗?我认为我使用api的方式不对,但没有找到一些文档/使用示例。
2条答案
按热度按时间nzkunb0c1#
我在这个问题上取得了一些进展。我现在使用下面的函数来完成这项工作
但是,最后出现了另一个错误,我在这里发布了访问sparkmap函数内部hdfs文件的详细信息
bzzcjhmw2#
几个问题一目了然。你定义
myFunc
但是打电话func
. 你的myFunc
没有返回类型,因此无法调用collect()
. 如果你的myFunc
确实没有返回值,你可以这样做foreach
而不是map
.collect()
将rdd中的数据拉入驱动程序,以允许您在本地(在驱动程序上)对其进行处理。