spark scala在将Dataframe转换为数据集时遇到问题

azpvetkf  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(567)

我有下面的数据框和下面的模式

  1. db.printSchema()
  2. root
  3. |-- _id: struct (nullable = true)
  4. | |-- oid: string (nullable = true)
  5. |-- id: string (nullable = true)
  6. |-- sparse_rep: struct (nullable = true)
  7. | |-- 1: double (nullable = true)
  8. | |-- 10: double (nullable = true)
  9. | |-- 11: double (nullable = true)
  10. | |-- 12: double (nullable = true)
  11. | |-- 13: double (nullable = true)
  12. | |-- 14: double (nullable = true)
  13. | |-- 15: double (nullable = true)
  14. | |-- 17: double (nullable = true)
  15. | |-- 18: double (nullable = true)
  16. | |-- 2: double (nullable = true)
  17. | |-- 20: double (nullable = true)
  18. | |-- 21: double (nullable = true)
  19. | |-- 22: double (nullable = true)
  20. | |-- 23: double (nullable = true)
  21. | |-- 24: double (nullable = true)
  22. | |-- 25: double (nullable = true)
  23. | |-- 26: double (nullable = true)
  24. | |-- 27: double (nullable = true)
  25. | |-- 3: double (nullable = true)
  26. | |-- 4: double (nullable = true)
  27. | |-- 7: double (nullable = true)
  28. | |-- 9: double (nullable = true)
  29. |-- title: string (nullable = true)

这里所有的ID看起来都很简单,除了稀疏表示。这个稀疏表示对象最初是在spark中作为map[int,double]对象创建的,然后写入mongodb。
但是,当我试图使用数据集将它强制回map[int,double]时

  1. case class blogRow(_id:String, id:Int, sparse_rep:Map[Int,Double],title:String)
  2. val blogRowEncoder = Encoders.product[blogRow]
  3. db.as[blogRow](blogRowEncoder)

我得到以下错误。

  1. Caused by: org.apache.spark.sql.AnalysisException: need a map field but got struct<1:double,10:double,11:double,12:double,13:double,14:double,15:double,17:double,18:double,2:double,20:double,21:double,22:double,23:double,24:double,25:double,26:double,27:double,3:double,4:double,7:double,9:double>;
cwtwac6a

cwtwac6a1#

转换 struct 键入到 map 然后输入用例类。
中的数据架构 DataFrame &中的字段 case class 应该匹配。
检查以下代码。

  1. scala> case class blogRow(_id:String, id:Int, sparse_rep:Map[Int,Double],title:String)
  2. defined class blogRow
  1. scala> val blogRowDF = df
  2. .withColumn("sparse_rep",map(
  3. df
  4. .select("sparse_rep.*")
  5. .columns
  6. .flatMap(c => List(lit(c).cast("int"),col(s"sparse_rep.${c}"))):_*)
  7. )
  8. .withColumn("_id",$"_id.oid")
  9. .withColumn("id",$"id".cast("int"))
  10. .as[blogRow]
  1. scala> blogRowDF.show(false)
  2. +---------+----+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+
  3. |_id |id |sparse_rep |title |
  4. +---------+----+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+
  5. |oid_value|null|Map(10 -> 10.0, 24 -> 24.0, 25 -> 25.0, 14 -> 14.0, 20 -> 20.0, 1 -> 1.0, 21 -> 21.0, 9 -> 9.0, 13 -> 13.0, 2 -> 2.0, 17 -> 17.0, 22 -> 22.0, 27 -> 27.0, 12 -> 12.0, 7 -> 7.0, 3 -> 3.0, 18 -> 18.0, 11 -> 11.0, 26 -> 26.0, 23 -> 23.0, 4 -> 4.0, 15 -> 15.0)|title_value|
  6. +---------+----+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+
展开查看全部
nc1teljy

nc1teljy2#

另一种选择-

输入Dataframe架构

  1. df.printSchema()
  2. /**
  3. * root
  4. * |-- _id: struct (nullable = true)
  5. * | |-- oid: string (nullable = true)
  6. * |-- id: string (nullable = true)
  7. * |-- sparse_rep: struct (nullable = true)
  8. * | |-- 1: double (nullable = true)
  9. * | |-- 10: double (nullable = true)
  10. * | |-- 11: double (nullable = true)
  11. * | |-- 12: double (nullable = true)
  12. * | |-- 13: double (nullable = true)
  13. * | |-- 14: double (nullable = true)
  14. * | |-- 15: double (nullable = true)
  15. * | |-- 17: double (nullable = true)
  16. * | |-- 18: double (nullable = true)
  17. * | |-- 2: double (nullable = true)
  18. * | |-- 20: double (nullable = true)
  19. * | |-- 21: double (nullable = true)
  20. * | |-- 22: double (nullable = true)
  21. * | |-- 23: double (nullable = true)
  22. * | |-- 24: double (nullable = true)
  23. * | |-- 25: double (nullable = true)
  24. * | |-- 26: double (nullable = true)
  25. * | |-- 27: double (nullable = true)
  26. * | |-- 3: double (nullable = true)
  27. * | |-- 4: double (nullable = true)
  28. * | |-- 7: double (nullable = true)
  29. * | |-- 9: double (nullable = true)
  30. * |-- title: string (nullable = true)
  31. */

将输入dataframe的模式转换为与case类匹配,然后转换为dataset[row]->dataset[blogrow]

  1. val ds =
  2. df.withColumn("sparse_rep", expr("from_json(to_json(sparse_rep), 'map<int, double>')"))
  3. .withColumn("_id",$"_id.oid")
  4. .withColumn("id",$"id".cast("int"))
  5. .as[BlogRow]
  6. ds.printSchema()
  7. /**
  8. * root
  9. * |-- _id: string (nullable = true)
  10. * |-- id: integer (nullable = true)
  11. * |-- sparse_rep: map (nullable = true)
  12. * | |-- key: integer
  13. * | |-- value: double (valueContainsNull = true)
  14. * |-- title: string (nullable = true)
  15. */

其中案例类别如下-

  1. case class BlogRow(_id:String, id:Int, sparse_rep:Map[Int,Double],title:String)
展开查看全部

相关问题