在scala-it.nerdammer中读取hbase

ht4b089n  于 2021-06-10  发布在  Hbase
关注(0)|答案(1)|浏览(331)

我想在spark流代码中读取hbase数据,以便查找和进一步增强流数据。我正在使用 spark-hbase-connector_2.10-1.0.3.jar .
在我的代码中,下面一行是成功的

  1. val docRdd =
  2. sc.hbaseTable[(Option[String], Option[String])]("hbase_customer_profile")
  3. .select("id","gender").inColumnFamily("data")
  4. ``` `docRdd.count` 返回正确的计数。 `docRdd` 属于类型
  5. hbasereaderbuilder(org.apache.spark。sparkcontext@3a49e5,hbase\u customer\u profile,some(数据),wrappedarray(id,gender),none,none,list())
  6. 我怎么能把所有的行都读出来 `id, gender` 请列一列。我怎样才能转换 `docRdd` 以便可以使用sparksql。
vd2z7a6w

vd2z7a6w1#

您可以读取 RDD 使用

  1. docRdd.collect().foreach(println)

转换 RDDDataFrame 您可以定义案例类:

  1. case class Customer(rowKey: String, id: Option[String], gender: Option[String])

我已经将row键添加到case类中;这不是绝对必要的,所以如果你不需要它,你可以省略它。
那么 map 超过 RDD :

  1. // Row key, id, gender
  2. type Record = (String, Option[String], Option[String])
  3. val rdd =
  4. sc.hbaseTable[Record]("customers")
  5. .select("id","gender")
  6. .inColumnFamily("data")
  7. .map(r => Customer(r._1, r._2, r._3))

然后-基于case类-转换 RDDDataFrame ```
import sqlContext.implicits._
val df = rdd.toDF()
df.show()
df.printSchema()

  1. 输出来自 `spark-shell` 看起来像这样:

scala> df.show()
+---------+----+------+
| rowKey| id|gender|
+---------+----+------+
|customer1| 1| null|
|customer2|null| f|
|customer3| 3| m|
+---------+----+------+

scala> df.printSchema()
root
|-- rowKey: string (nullable = true)
|-- id: string (nullable = true)
|-- gender: string (nullable = true)

展开查看全部

相关问题