我用spark读取一堆文件,对它们进行详细说明,然后将它们保存为一个序列文件。我想要的是每个分区有一个序列文件,所以我这样做了:
SparkConf sparkConf = new SparkConf().setAppName("writingHDFS")
.setMaster("local[2]")
.set("spark.streaming.stopGracefullyOnShutdown", "true");
final JavaSparkContext jsc = new JavaSparkContext(sparkConf);
jsc.hadoopConfiguration().addResource(hdfsConfPath + "hdfs-site.xml");
jsc.hadoopConfiguration().addResource(hdfsConfPath + "core-site.xml");
//JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(5*1000));
JavaPairRDD<String, PortableDataStream> imageByteRDD = jsc.binaryFiles(sourcePath);
if(!imageByteRDD.isEmpty())
imageByteRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String,PortableDataStream>>>() {
@Override
public void call(Iterator<Tuple2<String, PortableDataStream>> arg0){
throws Exception {
[°°°SOME STUFF°°°]
SequenceFile.Writer writer = SequenceFile.createWriter(
jsc.hadoopConfiguration(),
//here lies the problem: how to pass the hadoopConfiguration I have put inside the Spark Context?
Previously, I created a Configuration for each partition, and it works, but I'm sure there is a much more "sparky way"
有人知道如何在rdd闭包中使用hadoop配置对象吗?
4条答案
按热度按时间zxlwwiss1#
可以序列化和反序列化
org.apache.hadoop.conf.Configuration
使用org.apache.spark.SerializableWritable
.例如:
s71maibg2#
看起来无法完成,所以我使用了以下代码:
cs7cruho3#
根据@steve的回答,这是一个java实现。
kknvjkwl4#
这里的问题是hadoop配置没有标记为
Serializable
所以spark不会把他们拉进RDD。它们被标记为Writable
,因此hadoop的序列化机制可以对它们进行marshall和unmarshall,但是spark不能直接处理它们两个长期解决方案是
添加对在spark中序列化可写内容的支持。也许是spark-2421?
使hadoop配置可序列化。
添加对序列化hadoop配置的显式支持。
对于使hadoop conf可序列化,您不会遇到任何主要的反对意见;假设您实现了自定义的ser/deser方法,这些方法委托给可写io调用(并且只遍历所有键/值对)。我是作为hadoop提交者这么说的。
更新:下面是创建一个serlializable类的代码,该类对hadoop配置的内容进行marshall处理。使用创建
val ser = new ConfSerDeser(hadoopConf)
; 在您的rdd中称为ser.get()
.请注意,对于某些人来说,对所有可写类进行泛型比较简单;您只需要在构造函数中提供一个类名,并在反序列化期间使用它来示例化可写的。