通过saveasobject保存rdd,异常“有一个不可序列化的结果:org.apache.hadoop.hbase.io.immutablebyteswritable”

8wtpewkr  于 2021-06-10  发布在  Hbase
关注(0)|答案(1)|浏览(414)

我需要将从hbase读取的rdd序列化到alluxio内存文件系统中,以便定期缓存和更新它,以便在增量spark计算中使用。
代码是这样的,但是遇到了有标题的异常

val inputTableNameEvent = HBaseTables.TABLE_XXX.tableName
val namedeRDDName = "EventAllCached2Update"
val alluxioPath = "alluxio://hadoop1:19998/"
val fileURI = alluxioPath + namedeRDDName
val path:AlluxioURI = new AlluxioURI("/"+namedeRDDName)

val fs:FileSystem = FileSystem.Factory.get()

val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, inputTableNameEvent)

val rdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
                classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
                classOf[org.apache.hadoop.hbase.client.Result])
numbers = rdd.count()
println("rdd count: " + numbers)
if( fs.exists(path))
       fs.delete(path)
rdd.saveAsObjectFile(fileURI)

有谁能帮助告诉我们如何将immutablebyteswritableMap到另一个类型以绕过此问题?另外,Map需要是可还原的,因为稍后我需要使用objectfile读回这个保存的对象,并将其转换为[(immutablebyteswritable,result)]rdd,以便稍后进行更新和计算。

htrmnn0y

htrmnn0y1#

您需要将rdd转换为row对象。类似于下面的内容,然后保存到hdfs。解析后的rdd与现在的任何其他rdd一样具有数据

val parsedRDD = yourRDD.map(tuple => tuple._2).map(result => (
      Row((result.getRow.map(_.toChar).mkString),
      (result.getColumn("CF".getBytes(),"column1".getBytes()).get(0).getValue.map(_.toChar).mkString),
      (result.getColumn("CF".getBytes(),"column2".getBytes()).get(0).getValue.map(_.toChar).mkString),
      (result.getColumn("CF".getBytes(),"column3".getBytes()).get(0).getValue.map(_.toChar).mkString),
      (result.getColumn("CF".getBytes(),"column4".getBytes()).get(0).getValue.map(_.toChar).mkString),
      (result.getColumn("CF".getBytes(),"column5".getBytes()).get(0).getValue.map(_.toChar).mkString),
      (result.getColumn("CF".getBytes(),"column5".getBytes()).get(0).getValue.map(_.toChar).mkString)
      )))

相关问题