java—在rdd方法/闭包中使用sparkcontext hadoop配置,如foreachpartition

eoxn13cs  于 2021-06-02  发布在  Hadoop
关注(0)|答案(4)|浏览(382)

我用spark读取一堆文件,对它们进行详细说明,然后将它们保存为一个序列文件。我想要的是每个分区有一个序列文件,所以我这样做了:

SparkConf sparkConf = new SparkConf().setAppName("writingHDFS")
                .setMaster("local[2]")
                .set("spark.streaming.stopGracefullyOnShutdown", "true");
        final JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        jsc.hadoopConfiguration().addResource(hdfsConfPath + "hdfs-site.xml");
        jsc.hadoopConfiguration().addResource(hdfsConfPath + "core-site.xml");
        //JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(5*1000));

        JavaPairRDD<String, PortableDataStream> imageByteRDD = jsc.binaryFiles(sourcePath);
        if(!imageByteRDD.isEmpty())
            imageByteRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String,PortableDataStream>>>() {

                @Override
                public void call(Iterator<Tuple2<String, PortableDataStream>> arg0){
                        throws Exception {
                  [°°°SOME STUFF°°°]
                  SequenceFile.Writer writer = SequenceFile.createWriter(
                                     jsc.hadoopConfiguration(), 
//here lies the problem: how to pass the hadoopConfiguration I have put inside the Spark Context? 
Previously, I created a Configuration for each partition, and it works, but I'm sure there is a much more "sparky way"

有人知道如何在rdd闭包中使用hadoop配置对象吗?

zxlwwiss

zxlwwiss1#

可以序列化和反序列化 org.apache.hadoop.conf.Configuration 使用 org.apache.spark.SerializableWritable .
例如:

import org.apache.spark.SerializableWritable

...

val hadoopConf = spark.sparkContext.hadoopConfiguration
// serialize here
val serializedConf = new SerializableWritable(hadoopConf)

// then access the conf by calling .value on serializedConf
rdd.map(someFunction(serializedConf.value))
s71maibg

s71maibg2#

看起来无法完成,所以我使用了以下代码:

final hdfsNameNodePath = "hdfs://quickstart.cloudera:8080";

JavaPairRDD<String, PortableDataStream> imageByteRDD = jsc.binaryFiles(sourcePath);
        if(!imageByteRDD.isEmpty())
            imageByteRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String,PortableDataStream>>>() {

                @Override
                public void call(Iterator<Tuple2<String, PortableDataStream>> arg0)
                        throws Exception {

                    Configuration conf = new Configuration();
                    conf.set("fs.defaultFS", hdfsNameNodePath);
                    //the string above should be passed as argument
SequenceFile.Writer writer = SequenceFile.createWriter(
                                     conf, 
                                     SequenceFile.Writer.file([***ETCETERA...
cs7cruho

cs7cruho3#

根据@steve的回答,这是一个java实现。

import java.io.Serializable;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;

public class SerializableHadoopConfiguration implements Serializable {
    Configuration conf;

    public SerializableHadoopConfiguration(Configuration hadoopConf) {
        this.conf = hadoopConf;

        if (this.conf == null) {
            this.conf = new Configuration();
        }
    }

    public SerializableHadoopConfiguration() {
        this.conf = new Configuration();
    }

    public Configuration get() {
        return this.conf;
    }

    private void writeObject(java.io.ObjectOutputStream out) throws IOException {
        this.conf.write(out);
    }

    private void readObject(java.io.ObjectInputStream in) throws IOException {
        this.conf = new Configuration();
        this.conf.readFields(in);
    }
}
kknvjkwl

kknvjkwl4#

这里的问题是hadoop配置没有标记为 Serializable 所以spark不会把他们拉进RDD。它们被标记为 Writable ,因此hadoop的序列化机制可以对它们进行marshall和unmarshall,但是spark不能直接处理它们
两个长期解决方案是
添加对在spark中序列化可写内容的支持。也许是spark-2421?
使hadoop配置可序列化。
添加对序列化hadoop配置的显式支持。
对于使hadoop conf可序列化,您不会遇到任何主要的反对意见;假设您实现了自定义的ser/deser方法,这些方法委托给可写io调用(并且只遍历所有键/值对)。我是作为hadoop提交者这么说的。
更新:下面是创建一个serlializable类的代码,该类对hadoop配置的内容进行marshall处理。使用创建 val ser = new ConfSerDeser(hadoopConf) ; 在您的rdd中称为 ser.get() .

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

 import org.apache.hadoop.conf.Configuration

/**
 * Class to make Hadoop configurations serializable; uses the
 * `Writeable` operations to do this.
 * Note: this only serializes the explicitly set values, not any set
 * in site/default or other XML resources.
 * @param conf
 */
class ConfigSerDeser(var conf: Configuration) extends Serializable {

  def this() {
    this(new Configuration())
  }

  def get(): Configuration = conf

  private def writeObject (out: java.io.ObjectOutputStream): Unit = {
    conf.write(out)
  }

  private def readObject (in: java.io.ObjectInputStream): Unit = {
    conf = new Configuration()
    conf.readFields(in)
  }

  private def readObjectNoData(): Unit = {
    conf = new Configuration()
  }
}

请注意,对于某些人来说,对所有可写类进行泛型比较简单;您只需要在构造函数中提供一个类名,并在反序列化期间使用它来示例化可写的。

相关问题