使用序列文件sequencefile在spark中写入和读取原始字节数组

gdx19jrr  于 2021-06-02  发布在  Hadoop
关注(0)|答案(2)|浏览(449)

你怎么写 RDD[Array[Byte]] 使用apachespark重新读取文件?

drnojrws

drnojrws1#

下面是一个包含所有必需导入的片段,您可以按照@choix的请求从sparkshell运行

import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.io.NullWritable

val path = "/tmp/path"

val rdd = sc.parallelize(List("foo"))
val bytesRdd = rdd.map{str  =>  (NullWritable.get, new BytesWritable(str.getBytes) )  }
bytesRdd.saveAsSequenceFile(path)

val recovered = sc.sequenceFile[NullWritable, BytesWritable]("/tmp/path").map(_._2.copyBytes())
val recoveredAsString = recovered.map( new String(_) )
recoveredAsString.collect()
// result is:  Array[String] = Array(foo)
zpf6vheq

zpf6vheq2#

常见的问题似乎是,从byteswriteable到nullwriteable出现了一个奇怪的cannot cast异常。另一个常见的问题是字节可写 getBytes 是一堆毫无意义的废话,根本没有字节。什么 getBytes 得到的字节比最后加上一吨零还要多!你必须使用 copyBytes ```
val rdd: RDD[Array[Byte]] = ???

// To write
rdd.map(bytesArray => (NullWritable.get(), new BytesWritable(bytesArray)))
.saveAsSequenceFile("/output/path", codecOpt)

// To read
val rdd: RDD[Array[Byte]] = sc.sequenceFileNullWritable, BytesWritable
.map(_._2.copyBytes())

相关问题