spark:join condition 带数组(可为空的)

azpvetkf  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(411)

我有2个Dataframe,想加入他们,想过滤数据,我想过滤
orgtypetoexclude与每个transactionid匹配的数据。
在一个单词中,我的transactionid是join contiions,orgtypetoexclude是exclude condition,这里分享一个简单的例子

  1. import org.apache.spark.sql.functions.expr
  2. import spark.implicits._
  3. val jsonstr ="""{
  4. "id": "3b4219f8-0579-4933-ba5e-c0fc532eeb2a",
  5. "Transactions": [
  6. {
  7. "TransactionId": "USAL",
  8. "OrgTypeToExclude": ["A","B"]
  9. },
  10. {
  11. "TransactionId": "USMD",
  12. "OrgTypeToExclude": ["E"]
  13. },
  14. {
  15. "TransactionId": "USGA",
  16. "OrgTypeToExclude": []
  17. }
  18. ]
  19. }"""
  20. val df = Seq((1, "USAL","A"),(4, "USAL","C"), (2, "USMD","B"),(5, "USMD","E"), (3, "USGA","C")).toDF("id", "code","Alp")
  21. val json = spark.read.json(Seq(jsonstr).toDS).select("Transactions.TransactionId","Transactions.OrgTypeToExclude")
  22. df.printSchema()
  23. json.printSchema()
  24. df.join(json,$"code"<=> $"TransactionId".cast("string") && !exp("array_contains(OrgTypeToExclude, Alp)") ,"inner" ).show()
  25. --Expecting output
  26. id Code Alp
  27. 4 "USAL" "C"
  28. 2 "USMD" "B"
  29. 3 "USGA" "C"

谢谢,马诺。

bpzcxfmw

bpzcxfmw1#

Transactions 是数组类型&您正在访问 TransactionId & OrgTypeToExclude 所以你会得到多个数组。
而不是你只是爆炸根水平 Transactions array&提取 OrgTypeToExclude & TransactionId 下一步很容易。
请检查下面的代码。

  1. scala> val jsonstr ="""{
  2. |
  3. | "id": "3b4219f8-0579-4933-ba5e-c0fc532eeb2a",
  4. | "Transactions": [
  5. | {
  6. | "TransactionId": "USAL",
  7. | "OrgTypeToExclude": ["A","B"]
  8. | },
  9. | {
  10. | "TransactionId": "USMD",
  11. | "OrgTypeToExclude": ["E"]
  12. | },
  13. | {
  14. | "TransactionId": "USGA",
  15. | "OrgTypeToExclude": []
  16. | }
  17. | ]
  18. | }"""
  19. jsonstr: String =
  20. {
  21. "id": "3b4219f8-0579-4933-ba5e-c0fc532eeb2a",
  22. "Transactions": [
  23. {
  24. "TransactionId": "USAL",
  25. "OrgTypeToExclude": ["A","B"]
  26. },
  27. {
  28. "TransactionId": "USMD",
  29. "OrgTypeToExclude": ["E"]
  30. },
  31. {
  32. "TransactionId": "USGA",
  33. "OrgTypeToExclude": []
  34. }
  35. ]
  36. }
  37. scala> val df = Seq((1, "USAL","A"),(4, "USAL","C"), (2, "USMD","B"),(5, "USMD","E"), (3, "USGA","C")).toDF("id", "code","Alp")
  38. df: org.apache.spark.sql.DataFrame = [id: int, code: string ... 1 more field]
  39. scala> val json = spark.read.json(Seq(jsonstr).toDS).select(explode($"Transactions").as("Transactions")).select($"Transactions.*")
  40. json: org.apache.spark.sql.DataFrame = [OrgTypeToExclude: array<string>, TransactionId: string]
  41. scala> df.show(false)
  42. +---+----+---+
  43. |id |code|Alp|
  44. +---+----+---+
  45. |1 |USAL|A |
  46. |4 |USAL|C |
  47. |2 |USMD|B |
  48. |5 |USMD|E |
  49. |3 |USGA|C |
  50. +---+----+---+
  51. scala> json.show(false)
  52. +----------------+-------------+
  53. |OrgTypeToExclude|TransactionId|
  54. +----------------+-------------+
  55. |[A, B] |USAL |
  56. |[E] |USMD |
  57. |[] |USGA |
  58. +----------------+-------------+
  59. scala> df.join(jsondf,(df("code") === jsondf("TransactionId") && !array_contains(jsondf("OrgTypeToExclude"),df("Alp"))),"inner").select("id","code","alp").show(false)
  60. +---+----+---+
  61. |id |code|alp|
  62. +---+----+---+
  63. |4 |USAL|C |
  64. |2 |USMD|B |
  65. |3 |USGA|C |
  66. +---+----+---+
  67. scala>
展开查看全部
ppcbkaq5

ppcbkaq52#

首先,您似乎忽略了一个事实,即事务也是一个数组,我们可以使用explode来处理它:

  1. val json = spark.read.json(Seq(jsonstr).toDS)
  2. .select(explode($"Transactions").as("t")) // deal with Transactions array first
  3. .select($"t.TransactionId", $"t.OrgTypeToExclude")

另外,array_contains想要一个值而不是一列作为它的第二个参数。我不知道有哪个版本支持引用列,因此我们将生成一个自定义项:

  1. val arr_con = udf { (a: Seq[String], v: String) => a.contains(v) }

然后我们可以这样修改连接条件:

  1. df.join(json0, $"code" <=> $"TransactionId" && ! arr_con($"OrgTypeToExclude", $"Alp"), "inner").show()

预期结果:

  1. scala> df.join(json, $"code" <=> $"TransactionId" && ! arr_con($"OrgTypeToExclude", $"Alp"), "inner").show()
  2. +---+----+---+-------------+----------------+
  3. | id|code|Alp|TransactionId|OrgTypeToExclude|
  4. +---+----+---+-------------+----------------+
  5. | 4|USAL| C| USAL| [A, B]|
  6. | 2|USMD| B| USMD| [E]|
  7. | 3|USGA| C| USGA| []|
  8. +---+----+---+-------------+----------------+
展开查看全部

相关问题