我想在spark流代码中读取hbase数据,以便查找和进一步增强流数据。我正在使用 spark-hbase-connector_2.10-1.0.3.jar
.
在我的代码中,下面一行是成功的
val docRdd =
sc.hbaseTable[(Option[String], Option[String])]("hbase_customer_profile")
.select("id","gender").inColumnFamily("data")
``` `docRdd.count` 返回正确的计数。 `docRdd` 属于类型
hbasereaderbuilder(org.apache.spark。sparkcontext@3a49e5,hbase\u customer\u profile,some(数据),wrappedarray(id,gender),none,none,list())
我怎么能把所有的行都读出来 `id, gender` 请列一列。我怎样才能转换 `docRdd` 以便可以使用sparksql。
1条答案
按热度按时间vd2z7a6w1#
您可以读取
RDD
使用转换
RDD
到DataFrame
您可以定义案例类:我已经将row键添加到case类中;这不是绝对必要的,所以如果你不需要它,你可以省略它。
那么
map
超过RDD
:然后-基于case类-转换
RDD
到DataFrame
```import sqlContext.implicits._
val df = rdd.toDF()
df.show()
df.printSchema()
scala> df.show()
+---------+----+------+
| rowKey| id|gender|
+---------+----+------+
|customer1| 1| null|
|customer2|null| f|
|customer3| 3| m|
+---------+----+------+
scala> df.printSchema()
root
|-- rowKey: string (nullable = true)
|-- id: string (nullable = true)
|-- gender: string (nullable = true)