scala缓存的spark rdd(从序列文件读取)有无效条目,如何修复?

23c0lvtd  于 2021-06-02  发布在  Hadoop
关注(0)|答案(2)|浏览(305)

我正在使用spark(v1.6.1)阅读hadoop序列文件。缓存rdd后,rdd中的内容将变为无效(最后一个条目重复) n 次)。
下面是我的代码片段:

import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.SequenceFileOutputFormat
import org.apache.spark.{SparkConf, SparkContext}

object Main {
  def main(args: Array[String]) {
    val seqfile = "data-1.seq"
    val conf: SparkConf = new SparkConf()
      .setAppName("..Buffer..")
      .setMaster("local")
      .registerKryoClasses(Array(classOf[Text]))
    val sc = new SparkContext(conf)

    sc.parallelize((0 to 1000).toSeq) //creating a sample sequence file
      .map(i => (new Text(s"$i"), new Text(s"${i*i}")))
      .saveAsHadoopFile(seqfile, classOf[Text], classOf[Text],
        classOf[SequenceFileOutputFormat[Text, Text]])

    val c = sc.sequenceFile(seqfile, classOf[Text], classOf[Text])
      .cache()
      .map(t => {println(t); t})
      .collectAsMap()
    println(c)
    println(c.size)

    sc.stop()
  }
}

输出:

(1000,1000000)
(1000,1000000)
(1000,1000000)
(1000,1000000)
(1000,1000000)
...... //Total 1000 lines with same content as above ...
Map(1000 -> 1000000)
1

编辑:对于将来的访问者:如果您正在阅读序列文件,就像我在上面的代码片段中所做的那样,请参阅接受的答案。一个简单的解决方法是复制hadoop Writable 示例:

val c = sc.sequenceFile(seqfile, classOf[Text], classOf[Text])
  .map(t =>(new Text(t._1), new Text(t._2)))   //Make copy of writable instances
p4tfgftt

p4tfgftt1#

请参考sequencefile中的注解。

/**Get an RDD for a Hadoop SequenceFile with given key and value types.
 *
 * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
 * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
 * operation will create many references to the same object.
 * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
 * copy them using a `map` function.
 */
eqzww0vc

eqzww0vc2#

下面这段代码对我有用。。。。我用copybytes代替getbytes

val response = sc.sequenceFile(inputPathConcat, classOf[Text], classOf[BytesWritable])
  .map(x => (org.apache.hadoop.io.Text.decode(x._2.copyBytes())))

相关问题