如何找到选定列的两个pyspark对象之间的增量,以及其他单独的增量

lpwwtiir  于 2024-01-06  发布在  Spark
关注(0)|答案(1)|浏览(137)

我必须找到2个PySpark Dataframe 之间的增量,它影响2个列和其他列,但要分开。
我有一个如下所示的 Dataframe :
| ID|公司_ID|值|评分|区域|
| --|--|--|--|--|
| 1A|小行星3412|值-1| 25 |英国|
| 2B|小行星2345|值-2| 34 |IE|
| 3C|小行星9800|值-3| 56 |美国|
| 4E|小行星6789|值-4| 78 |在|
第二个 Dataframe 如下所示:
| ID|公司_ID|值|评分|区域|
| --|--|--|--|--|
| 1A|小行星3412|值-1| 25 |英国|
| 2B|小行星2345|值-5| 36 |在|
| 3C|小行星9800|值-3| 56 |美国|
| 4E|小行星6789|值-4| 81 |RU|
我想要结果数据框,这样的结果xml可以这样做。注意,<modify_type>子字段只适用于分数,区域字段,而不是其他字段。因此,我需要跟踪增量,如标题中所述。

  1. <action>
  2. <action_type>modify</action_type>
  3. <ID>
  4. <ID>2B</ID>
  5. <Company_Id>2345tyu</Company_Id>
  6. <value>value-5</value >
  7. <score>
  8. <modify_type>remove</modify_type>
  9. <score_value>34</score_value>
  10. </score>
  11. <score>
  12. <modify_type>add</modify_type>
  13. <score_value>36</score_value>
  14. </score>
  15. <region>
  16. <modify_type>remove</modify_type>
  17. <region_value>IE</region_value>
  18. </region>
  19. <region>
  20. <modify_type>add</modify_type>
  21. <region_value>IN</region_value>
  22. </region>
  23. </ID>
  24. <ID>
  25. <ID>4E</ID>
  26. <Company_Id>6789tvu</Company_Id>
  27. <score>
  28. <modify_type>remove</modify_type>
  29. <score_value>78</score_value>
  30. </score>
  31. <score>
  32. <modify_type>add</modify_type>
  33. <score_value>81</score_value>
  34. </score>
  35. <region>
  36. <modify_type>remove</modify_type>
  37. <region_value>IN</region_value>
  38. </region>
  39. <region>
  40. <modify_type>add</modify_type>
  41. <region_value>RU</region_value>
  42. </region>
  43. </action>

字符串
我实施的解决方案是这样的
1.我通过对关键字列进行分组来透视值、得分和区域列。

  1. df1 = df1.select('ID', 'Company_Id', F.expr("stack(3, 'value', `value`,\
  2. 'score', `score`,\
  3. 'region', `region`)as (ListName, Previous_Value)" ))
  4. df2 = df2.select('ID', 'Company_Id', F.expr("stack(3, 'value', `value`,\
  5. 'score', `score`,\
  6. 'region', `region`)as (ListName, Current_Value)" ))


1.在Key列上联接以上 Dataframe 并比较值。

  1. df_join = df2.join(df1, (df2.Company_Id == df1.Company_Id ) & (df2.ID == df1.ID) & (df2.ListName == df1.ListName), 'inner').select(df1.ID, df1.ListName, 'Current_Value', 'Previous_Value')


1.这将生成如下所示的 Dataframe
| ID|公司_ID| ListName|上一个值(_V)|当前值(_V)|
| --|--|--|--|--|
| 1A|小行星3412|值|值-1|值-1|
| 1A|小行星3412|评分| 25 | 25 |
| 1A|小行星3412|区域|英国|英国|
| 2B|小行星2345|值|值-2|值-5|
| 2B|小行星2345|评分| 34 | 36 |
| 2B|小行星2345|区域|IE|在|
| 3C|小行星9800|值|值-3|值-3|
| 3C|小行星9800|评分| 56 | 56 |
| 3C|小行星9800|区域|美国|美国|
| 4E|小行星6789|值|值-4|值-4|
| 4E|小行星6789|评分| 78 | 81 |
| 4E|小行星6789|区域|在|RU|
从这里我使用了大量的情况下,当条件,以实际得到的结果。
我试图使这个过程在计算上更高效。

yqhsw0fo

yqhsw0fo1#

对于pyspark,您应该能够对每个DataFrame中的选定列使用antijoin

  1. df1.select(col1, col2, col4).join(df2.select(col1, col2, col4), on='ID', how='anti')

字符串

相关问题