从hbase检索数据并将其格式化为scalaDataframe

cu6pst1q  于 2021-06-09  发布在  Hbase
关注(0)|答案(1)|浏览(326)

我正试图从hbase表中获取数据到apachespark环境中,但我不知道如何格式化它。有人能帮帮我吗。

  1. case class systems( rowkey: String, iacp: Option[String], temp: Option[String])
  2. type Record = (String, Option[String], Option[String])
  3. val hBaseRDD_iacp = sc.hbaseTable[Record]("test_table").select("iacp","temp").inColumnFamily("test_fam")
  4. scala> hBaseRDD_iacp.map(x => systems(x._1,x._2,x._3)).toDF().show()
  5. +--------------+-----------------+--------------------+
  6. | rowkey| iacp| temp|
  7. +--------------+-----------------+--------------------+
  8. | ab7|0.051,0.052,0.055| 17.326,17.344,17.21|
  9. | k6c| 0.056,NA,0.054|17.277,17.283,17.256|
  10. | ad| NA,23.0| 24.0,23.6|
  11. +--------------+-----------------+--------------------+

但是,我希望它的格式如下。每一个逗号分隔的值都在新行中,并且每一个na都被空值替换。iacp和temp列中的值应为浮点型。每行可以有不同数量的逗号分隔值。
提前谢谢!

  1. +--------------+-----------------+--------------------+
  2. | rowkey| iacp| temp|
  3. +--------------+-----------------+--------------------+
  4. | ab7| 0.051| 17.326|
  5. | ab7| 0.052| 17.344|
  6. | ab7| 0.055| 17.21|
  7. | k6c| 0.056| 17.277|
  8. | k6c| null| 17.283|
  9. | k6c| 0.054| 17.256|
  10. | ad| null| 24.0|
  11. | ad| 23| 26.0|
  12. +--------------+-----------------+--------------------+
mwngjboj

mwngjboj1#

你的 hBaseRDD_iacp.map(x => systems(x._1, x._2, x._3)).toDF 代码行应生成与以下内容等效的Dataframe:

  1. val df = Seq(
  2. ("ab7", Some("0.051,0.052,0.055"), Some("17.326,17.344,17.21")),
  3. ("k6c", Some("0.056,NA,0.054"), Some("17.277,17.283,17.256")),
  4. ("ad", Some("NA,23.0"), Some("24.0,23.6"))
  5. ).toDF("rowkey", "iacp", "temp")

要将数据集转换为所需的结果,可以应用一个udf,将数据集的元素配对 iacp 以及 temp csv字符串以生成 (Option[Double], Option[Double]) 那是什么 explode -如下图所示:

  1. import org.apache.spark.sql.functions._
  2. import spark.implicits._
  3. def pairUpCSV = udf{ (s1: String, s2: String) =>
  4. import scala.util.Try
  5. def toNumericArr(csv: String) = csv.split(",").map{
  6. case s if Try(s.toDouble).isSuccess => Some(s)
  7. case _ => None
  8. }
  9. toNumericArr(s1).zipAll(toNumericArr(s2), None, None)
  10. }
  11. df.
  12. withColumn("csv_pairs", pairUpCSV($"iacp", $"temp")).
  13. withColumn("csv_pair", explode($"csv_pairs")).
  14. select($"rowkey", $"csv_pair._1".as("iacp"), $"csv_pair._2".as("temp")).
  15. show(false)
  16. // +------+-----+------+
  17. // |rowkey|iacp |temp |
  18. // +------+-----+------+
  19. // |ab7 |0.051|17.326|
  20. // |ab7 |0.052|17.344|
  21. // |ab7 |0.055|17.21 |
  22. // |k6c |0.056|17.277|
  23. // |k6c |null |17.283|
  24. // |k6c |0.054|17.256|
  25. // |ad |null |24.0 |
  26. // |ad |23.0 |23.6 |
  27. // +------+-----+------+

注意这个值 NA 属于方法中的默认大小写 toNumericArr 因此,这并不是一个单独的案例。也, zipAll (而不是 zip )在自定义项中用于涵盖 iacp 以及 temp csv字符串具有不同的元素大小。

展开查看全部

相关问题