spark scalaDataframe中键值对的增量值计数

jhiyze9q  于 2021-05-19  发布在  Spark
关注(0)|答案(1)|浏览(483)

我是spark&scala新手,我正在尝试使用另一列中的值来增加列中的键值对的值。
下面是输入Dataframe。

  1. val inputDF = Seq(
  2. (1, "Visa", 1, None),
  3. (2, "MC", 2, Some("Visa -> 1")),
  4. (3, "Amex", 1, None),
  5. (4, "Amex", 3, Some("Visa -> 1, MC -> 1")),
  6. (5, "Amex", 4, Some("Visa -> 2, MC -> 1")),
  7. (6, "MC", 1, None),
  8. (7, "Visa", 5, Some("Visa -> 2, MC -> 1, Amex -> 1")),
  9. (8, "Visa", 6, Some("Visa -> 2, MC -> 2, Amex -> 1")),
  10. (9, "MC", 1, None),
  11. (10, "MC", 2, Some("Amex -> 1"))).toDF("person_id", "card_type", "number_of_cards", "card_type_details")
  12. +---------+---------+---------------+-----------------------------+
  13. |person_id|card_type|number_of_cards|card_type_details |
  14. +---------+---------+---------------+-----------------------------+
  15. |1 |Visa |1 |null |
  16. |2 |MC |2 |Visa -> 1 |
  17. |3 |Amex |1 |null |
  18. |4 |Amex |3 |Visa -> 1, MC -> 1 |
  19. |5 |Amex |4 |Visa -> 2, MC -> 1 |
  20. |6 |MC |1 |null |
  21. |7 |Visa |5 |Visa -> 2, MC -> 1, Amex -> 1|
  22. |8 |Visa |6 |Visa -> 2, MC -> 2, Amex -> 1|
  23. |9 |MC |1 |null |
  24. |10 |MC |2 |Amex -> 1 |
  25. +---------+---------+---------------+-----------------------------+

现在,根据上面的输入,如果card\u type\u details的值为空,则从card\u type获取值并添加->1(如第一行)。
如果card\u type\u details的值不为空,则检查card\u type是否已作为密钥存在于card\u type\u details中。如果是,则将相应键的值增加1,否则,添加一个新的键-值对(如第二行和第七行)。
以下是预期输出:

  1. val expectedOutputDF = Seq(
  2. (1, "Visa", 1, Some("Visa -> 1")),
  3. (2, "MC", 2, Some("Visa -> 1, MC -> 1")),
  4. (3, "Amex", 1, Some("Amex -> 1")),
  5. (4, "Amex", 3, Some("Visa -> 1, MC -> 1, Amex -> 1")),
  6. (5, "Amex", 4, Some("Visa -> 2, MC -> 1, Amex -> 1")),
  7. (6, "MC", 1, Some("MC -> 1")),
  8. (7, "Visa", 5, Some("Visa -> 3, MC -> 1, Amex -> 1")),
  9. (8, "Visa", 6, Some("Visa -> 3, MC -> 2, Amex -> 1")),
  10. (9, "MC", 1, Some("MC -> 1")),
  11. (10, "MC", 2, Some("Amex -> 1, MC -> 1"))).toDF("person_id", "card_type", "number_of_cards", "card_type_details")
  12. +---------+---------+---------------+-----------------------------+
  13. |person_id|card_type|number_of_cards|card_type_details |
  14. +---------+---------+---------------+-----------------------------+
  15. |1 |Visa |1 |Visa -> 1 |
  16. |2 |MC |2 |Visa -> 1, MC -> 1 |
  17. |3 |Amex |1 |Amex -> 1 |
  18. |4 |Amex |3 |Visa -> 1, MC -> 1, Amex -> 1|
  19. |5 |Amex |4 |Visa -> 2, MC -> 1, Amex -> 1|
  20. |6 |MC |1 |MC -> 1 |
  21. |7 |Visa |5 |Visa -> 3, MC -> 1, Amex -> 1|
  22. |8 |Visa |6 |Visa -> 3, MC -> 2, Amex -> 1|
  23. |9 |MC |1 |MC -> 1 |
  24. |10 |MC |2 |Amex -> 1, MC -> 1 |
  25. +---------+---------+---------------+-----------------------------+

关于如何提取这个有什么建议吗?

6jygbczu

6jygbczu1#

假设 card_type_details 属于类型 map . 检查以下代码。

  1. scala> df.show(false)
  2. +---------+---------+---------------+-----------------------------+
  3. |person_id|card_type|number_of_cards|card_type_details |
  4. +---------+---------+---------------+-----------------------------+
  5. |1 |Visa |1 |null |
  6. |2 |MC |2 |Visa -> 1 |
  7. |3 |Amex |1 |null |
  8. |4 |Amex |3 |Visa -> 1, MC -> 1 |
  9. |5 |Amex |4 |Visa -> 2, MC -> 1 |
  10. |6 |MC |1 |null |
  11. |7 |Visa |5 |Visa -> 2, MC -> 1, Amex -> 1|
  12. |8 |Visa |6 |Visa -> 2, MC -> 2, Amex -> 1|
  13. |9 |MC |1 |null |
  14. |10 |MC |2 |Amex -> 1 |
  15. +---------+---------+---------------+-----------------------------+

创建表达式。

  1. scala> :paste
  2. // Entering paste mode (ctrl-D to finish)
  3. val colExpr = when(size($"card_type_details") === 0, map($"card_type",lit(1)))
  4. .otherwise(
  5. when(
  6. expr("card_type_details[card_type]").isNotNull,
  7. map_concat(
  8. expr("map(card_type,card_type_details[card_type] + 1)"),
  9. expr("map_filter(card_type_details,(k,v) -> k != card_type)")
  10. )
  11. )
  12. .otherwise(map_concat($"card_type_details",map($"card_type",lit(1))))
  13. )
  14. // Exiting paste mode, now interpreting.
  15. colExpr: org.apache.spark.sql.Column = CASE WHEN (size(card_type_details) = 0) THEN map(card_type, 1) ELSE CASE WHEN (card_type_details[card_type] IS NOT NULL) THEN map_concat(map(card_type, (card_type_details[card_type] + 1)), map_filter(card_type_details, lambdafunction((NOT (k = card_type)), k, v))) ELSE map_concat(card_type_details, map(card_type, 1)) END END
  1. scala> indf.withColumn("new_card_type_details",colExpr).show(false)
  2. +---------+---------+---------------+-------------------------------+-------------------------------+
  3. |person_id|card_type|number_of_cards|card_type_details |new_card_type_details |
  4. +---------+---------+---------------+-------------------------------+-------------------------------+
  5. |1 |Visa |1 |[] |[Visa -> 1] |
  6. |2 |MC |2 |[Visa -> 1] |[Visa -> 1, MC -> 1] |
  7. |3 |Amex |1 |[] |[Amex -> 1] |
  8. |4 |Amex |3 |[Visa -> 1, MC -> 1] |[Visa -> 1, MC -> 1, Amex -> 1]|
  9. |5 |Amex |4 |[Visa -> 2, MC -> 1] |[Visa -> 2, MC -> 1, Amex -> 1]|
  10. |6 |MC |1 |[] |[MC -> 1] |
  11. |7 |Visa |5 |[Visa -> 2, MC -> 1, Amex -> 1]|[Visa -> 3, MC -> 1, Amex -> 1]|
  12. |8 |Visa |6 |[Visa -> 2, MC -> 2, Amex -> 1]|[Visa -> 3, MC -> 2, Amex -> 1]|
  13. |9 |MC |1 |[] |[MC -> 1] |
  14. |10 |MC |2 |[Amex -> 1] |[Amex -> 1, MC -> 1] |
  15. +---------+---------+---------------+-------------------------------+-------------------------------+
展开查看全部

相关问题