spark没有从二进制文件中读取所有记录

brqmpdu1  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(839)

我正在尝试从s3读取avro文件,如spark文档所示,我能够很好地读取它。我的文件如下,这些文件由5000条记录组成。

  1. s3a://bucket/part-0.avro
  2. s3a://bucket/part-1.avro
  3. s3a://bucket/part-2.avro
  4. val byteRDD: RDD[Array[Byte]] = sc.binaryFiles(s"$s3URL/*.avro").map{ case(file, pds) => {
  5. val dis = pds.open()
  6. val len = dis.available()
  7. val buf = Array.ofDim[Byte](len)
  8. pds.open().readFully(buf)
  9. buf
  10. }}
  11. import org.apache.avro.io.DecoderFactory
  12. val deserialisedAvroRDD = byteRDD.map(record => {
  13. import org.apache.avro.Schema
  14. val schema = new Schema.Parser().parse(schemaJson)
  15. val datumReader = new GenericDatumReader[GenericRecord](schema)
  16. val decoder = DecoderFactory.get.binaryDecoder(record, null)
  17. var datum: GenericRecord = null
  18. while (!decoder.isEnd()) {
  19. datum = datumReader.read(datum, decoder)
  20. }
  21. datum
  22. }
  23. )
  24. deserialisedAvroRDD.count() ---> 3

我正在反序列化binaryavro消息以生成genericrecords,我希望反序列化的rdd有15k条记录,因为每个.avro文件有5k条记录,但是反序列化后我只得到3条记录。有人能帮我找出代码的问题吗?如何一次序列化一条记录。

sirbozc5

sirbozc51#

这应该管用

  1. val recRDD: RDD[GenericRecord] = sc.binaryFiles(s"$s3URL/*.avro").flatMap {
  2. case (file, pds) => {
  3. val schema = new Schema.Parser().parse(schemaJson)
  4. val datumReader = new GenericDatumReader[GenericRecord](schema)
  5. val decoder = DecoderFactory.get.binaryDecoder(pds.toArray(), null)
  6. var datum: GenericRecord = null
  7. val out = ArrayBuffer[GenericRecord]()
  8. while (!decoder.isEnd()) {
  9. out += datumReader.read(datum, decoder)
  10. }
  11. out
  12. }
  13. }

相关问题