在scala中创建rdd[(immutablebyteswritable,result)]

dauxcl2d  于 2021-06-09  发布在  Hbase
关注(0)|答案(1)|浏览(371)

我正在做一个单元测试,需要创建一个 RDD[(ImmutableBytesWritable, Result)] . 数据只包含唯一的 id 一个非唯一的 value 柱。
我可以使用 toDF ,并将其转换为 RDD[Row] ; 但我很难把它Map到 RDD[(ImmutableBytesWritable, Result)] .

  1. val values = List((1, 1234), (2, 123), (3, 1234))
  2. import spark.implicits._
  3. val df = values.toDF("id", "value")
  4. val counts : RDD[(ImmutableBytesWritable, Result)] = df.rdd.map(
  5. row => (new ImmutableBytesWritable(), Result.create(...))
  6. )

谢谢您!

vmpqdwk3

vmpqdwk31#

我不使用hbase,但考虑到您试图构建的签名,我想到了这个。试试看。
我使用cellutil创建了一个单元格来生成结果。

  1. import org.apache.hadoop.hbase.{Cell, CellUtil}
  2. import scala.collection.JavaConversions._
  3. import scala.collection.mutable.ListBuffer
  4. import scala.math.BigInt
  5. import org.apache.spark._
  6. import org.apache.spark.rdd._
  7. import org.apache.spark.sql._
  8. import org.apache.hadoop.hbase.client.Result
  9. import org.apache.hadoop.hbase.io.ImmutableBytesWritable
  10. object Practice extends App {
  11. val sparkConfig = new SparkConf().setAppName("test").setMaster("local[*]")
  12. val ss = SparkSession.builder().config(sparkConfig).getOrCreate()
  13. val values = Seq((1, 1234), (2, 123), (3, 1234))
  14. import ss.implicits._
  15. val df = values.toDF("id", "value")
  16. val counts: RDD[(ImmutableBytesWritable, Result)] = df.rdd.map{ row =>
  17. val key = row.getAs[Int]("id")
  18. val keyByteArray = BigInt(key).toByteArray
  19. val ibw = new ImmutableBytesWritable()
  20. ibw.set(keyByteArray)
  21. val value = row.getAs[Int]("value")
  22. val valueByteArray = BigInt(value).toByteArray
  23. val cellList = List(CellUtil.createCell(valueByteArray))
  24. val cell: java.util.List[Cell] = ListBuffer(cellList: _*)
  25. val result = Result.create(cell)
  26. (ibw, result)
  27. }
  28. }

打印结果,这并不意味着一个好的答案,给你这个:

  1. KeyValue(03,keyvalues={\x04\xD2//LATEST_TIMESTAMP/Maximum/vlen=0/seqid=0})
  2. KeyValue(01,keyvalues={\x04\xD2//LATEST_TIMESTAMP/Maximum/vlen=0/seqid=0})
  3. KeyValue(02,keyvalues={{//LATEST_TIMESTAMP/Maximum/vlen=0/seqid=0})
展开查看全部

相关问题