我正试图从hbase表中获取数据到apachespark环境中,但我不知道如何格式化它。有人能帮帮我吗。
case class systems( rowkey: String, iacp: Option[String], temp: Option[String])
type Record = (String, Option[String], Option[String])
val hBaseRDD_iacp = sc.hbaseTable[Record]("test_table").select("iacp","temp").inColumnFamily("test_fam")
scala> hBaseRDD_iacp.map(x => systems(x._1,x._2,x._3)).toDF().show()
+--------------+-----------------+--------------------+
| rowkey| iacp| temp|
+--------------+-----------------+--------------------+
| ab7|0.051,0.052,0.055| 17.326,17.344,17.21|
| k6c| 0.056,NA,0.054|17.277,17.283,17.256|
| ad| NA,23.0| 24.0,23.6|
+--------------+-----------------+--------------------+
但是,我希望它的格式如下。每一个逗号分隔的值都在新行中,并且每一个na都被空值替换。iacp和temp列中的值应为浮点型。每行可以有不同数量的逗号分隔值。
提前谢谢!
+--------------+-----------------+--------------------+
| rowkey| iacp| temp|
+--------------+-----------------+--------------------+
| ab7| 0.051| 17.326|
| ab7| 0.052| 17.344|
| ab7| 0.055| 17.21|
| k6c| 0.056| 17.277|
| k6c| null| 17.283|
| k6c| 0.054| 17.256|
| ad| null| 24.0|
| ad| 23| 26.0|
+--------------+-----------------+--------------------+
1条答案
按热度按时间mwngjboj1#
你的
hBaseRDD_iacp.map(x => systems(x._1, x._2, x._3)).toDF
代码行应生成与以下内容等效的Dataframe:要将数据集转换为所需的结果,可以应用一个udf,将数据集的元素配对
iacp
以及temp
csv字符串以生成(Option[Double], Option[Double])
那是什么explode
-如下图所示:注意这个值
NA
属于方法中的默认大小写toNumericArr
因此,这并不是一个单独的案例。也,zipAll
(而不是zip
)在自定义项中用于涵盖iacp
以及temp
csv字符串具有不同的元素大小。