While Loop to Pyspark中的连接

zsohkypk  于 2024-01-06  发布在  Spark
关注(0)|答案(1)|浏览(148)

我有一个框架:

  1. data = [('service 1','s1', 's2'),
  2. ('service 2','s2', 's4'),
  3. ('service 3','s3', 's5'),
  4. ('service 5','s5', 's6'),
  5. ('service 4','s4', 's3')]
  6. sdf = spark.createDataFrame(data, schema = ['description', 'service', 'parent'])
  7. sdf.show()
  8. +-----------+-------+--------+
  9. |description|service|parent |
  10. +-----------+-------+--------+
  11. | service 1| s1| s2|
  12. | service 2| s2| s4|
  13. | service 3| s3| s5|
  14. | service 5| s5| s6|
  15. | service 4| s4| s3|
  16. | service 6| s6| NULL|
  17. +-----------+-------+--------+

字符串
我想看看parent是否有来自service列的值,并添加一个新列description。然后serviceparent列应该被删除。
这将是理想的最终结果

  1. +-----------+-----------+-----------+-----------+-----------+-----------+
  2. |description| parent| parent1| parent2| parent3| parent4|
  3. +-----------+-----------+-----------+-----------+-----------+-----------+
  4. | service 1| service 2| service 4| service 3| service 5| service 6|
  5. | service 2| service 4| service 3| service 5| service 6| NULL|
  6. | service 3| service 5| service 6| NULL| NULL| NULL|
  7. | service 5| service 6| NULL| NULL| NULL| NULL|
  8. | service 4| service 3| service 5| service 6| NULL| NULL|
  9. +-----------+-----------+-----------+-----------+-----------+-----------+


从上一个问题中,我已经能够将该代码用于serviceparent这两个列,但无法合并description列。

  1. i = 0
  2. while(sdf.filter(F.col(f"parent{i if i>0 else ''}").isNotNull()).count() > 0):
  3. sdf = sdf.alias("a1").join(sdf.alias("a2").select("service", "parent"),
  4. F.col(f"a1.parent{i if i>0 else ''}")==F.col("a2.service"),
  5. how="left") \
  6. .withColumn(f"parent{i+1}", F.col("a2.parent")) \
  7. .drop(F.col("a2.service")) \
  8. .drop(F.col("a2.parent"))
  9. i += 1
  10. display(sdf)


我得到了以下答案:

  1. +-----------+-------+------+-------+
  2. |description|service|parent|parent1|
  3. +-----------+-------+------+-------+
  4. | service 1| s1| s2| s4|
  5. | service 2| s2| s4| s3|
  6. | service 3| s3| s5| s6|
  7. | service 5| s5| s6| null|
  8. | service 4| s4| s3| s5|
  9. +-----------+-------+------+-------+
  10. +-----------+-------+------+-------+-------+
  11. |description|service|parent|parent1|parent2|
  12. +-----------+-------+------+-------+-------+
  13. | service 1| s1| s2| s4| s3|
  14. | service 2| s2| s4| s3| s5|
  15. | service 3| s3| s5| s6| null|
  16. | service 5| s5| s6| null| null|
  17. | service 4| s4| s3| s5| s6|
  18. +-----------+-------+------+-------+-------+
  19. +-----------+-------+------+-------+-------+-------+
  20. |description|service|parent|parent1|parent2|parent3|
  21. +-----------+-------+------+-------+-------+-------+
  22. | service 1| s1| s2| s4| s3| s5|
  23. | service 2| s2| s4| s3| s5| s6|
  24. | service 3| s3| s5| s6| null| null|
  25. | service 5| s5| s6| null| null| null|
  26. | service 4| s4| s3| s5| s6| null|
  27. +-----------+-------+------+-------+-------+-------+
  28. +-----------+-------+------+-------+-------+-------+-------+
  29. |description|service|parent|parent1|parent2|parent3|parent4|
  30. +-----------+-------+------+-------+-------+-------+-------+
  31. | service 1| s1| s2| s4| s3| s5| s6|
  32. | service 2| s2| s4| s3| s5| s6| null|
  33. | service 3| s3| s5| s6| null| null| null|
  34. | service 5| s5| s6| null| null| null| null|
  35. | service 4| s4| s3| s5| s6| null| null|
  36. +-----------+-------+------+-------+-------+-------+-------+
  37. +-----------+-------+------+-------+-------+-------+-------+-------+
  38. |description|service|parent|parent1|parent2|parent3|parent4|parent5|
  39. +-----------+-------+------+-------+-------+-------+-------+-------+
  40. | service 1| s1| s2| s4| s3| s5| s6| null|
  41. | service 2| s2| s4| s3| s5| s6| null| null|
  42. | service 3| s3| s5| s6| null| null| null| null|
  43. | service 5| s5| s6| null| null| null| null| null|
  44. | service 4| s4| s3| s5| s6| null| null| null|
  45. +-----------+-------+------+-------+-------+-------+-------+-------+

jslywgbw

jslywgbw1#

我从这里改编了我以前的回答:
https://gist.github.com/dineshdharme/7c13dcde72e42fdd3ec47d1ad40f6177
Graphframe jar可以在以下位置找到:Files:(jar[242KB])
https://mvnrepository.com/artifact/graphframes/graphframes/0.8.1-spark3.0-s_2.12
要求:
第一个月

阅读这里和这里的主题。

https://docs.databricks.com/en/_extras/notebooks/source/graphframes-user-guide-py.html

https://graphframes.github.io/graphframes/docs/_site/user-guide.html#motif-finding

  1. import sys
  2. from pyspark import SQLContext
  3. from pyspark.sql.functions import *
  4. from graphframes import *
  5. import pyspark.sql.functions as F
  6. from pyspark.sql import SparkSession
  7. ##### Adding the graphframes jar so that we can access GraphX API of Apache Spark in pyspark
  8. ## Jars can be found at this location : https://spark-packages.org/package/graphframes/graphframes
  9. spark = SparkSession.builder \
  10. .appName("MyApp") \
  11. .config("spark.jars", "file://spark-jars/graphframes-0.8.2-spark3.2-s_2.12.jar") \
  12. .getOrCreate()
  13. sc = spark.sparkContext
  14. sc.setCheckpointDir("/tmp/whatever")
  15. sqlContext = SQLContext(sc)
  16. data = [
  17. ('service 1', 's1', 's2'),
  18. ('service 2', 's2', 's4'),
  19. ('service 3', 's3', 's5'),
  20. ('service 5', 's5', 's6'),
  21. ('service 4', 's4', 's3'),
  22. ('service 6', 's6', 'Leaf')
  23. ]
  24. initial_df = spark.createDataFrame(data, schema=['description', 'service', 'parent'])
  25. initial_df.show()
  26. ### Giving unique id to the leaf Node so that we can aggregate the results later properly
  27. initial_df = initial_df.withColumn("relationshipId", F.monotonically_increasing_id())
  28. source_vertex_list = initial_df.select(F.col("service").alias("vertices")).distinct()
  29. destination_vertex_list = initial_df.select(F.col("parent").alias("vertices")).distinct()
  30. #second_vertex_list = initial_df.select(F.col("parent").alias("vertices"), F.lit("No").alias("isLeafNode"), F.lit(0).alias("salary")).distinct()
  31. intermediate_df = source_vertex_list.union(destination_vertex_list).distinct()
  32. final_vertices_df = intermediate_df.withColumn("id", F.col("vertices")).drop("vertices")
  33. print("All vertices list")
  34. final_vertices_df.show(n=1000, truncate=False)
  35. given_edge_list_df = initial_df.select(F.col("service").alias("src"), F.col("parent").alias("dst"))
  36. given_edge_list_df.show(n=1000, truncate=False)
  37. print("All vertices")
  38. final_vertices_df.show(n=1000, truncate=False)
  39. print("All relationships")
  40. given_edge_list_df.show(n=1000, truncate=False)
  41. ## Creating a graph representation of the vertices and edges relationship
  42. g = GraphFrame(final_vertices_df, given_edge_list_df)
  43. ## Read about motifs here and here.
  44. # https://docs.databricks.com/en/_extras/notebooks/source/graphframes-user-guide-py.html
  45. # https://graphframes.github.io/graphframes/docs/_site/user-guide.html#motif-finding
  46. paths = g.bfs("id = 's1'", "id = 'Leaf'")
  47. paths.show()
  48. result = g.connectedComponents()
  49. print("Connected components")
  50. result.select("id", "component").orderBy("component").show()

字符串

在路径中,你可以看到从源顶点到目标顶点的路径,你可以收集列表中的中间顶点并将它们分解为列。

输出量:

  1. +-----------+-------+------+
  2. |description|service|parent|
  3. +-----------+-------+------+
  4. | service 1| s1| s2|
  5. | service 2| s2| s4|
  6. | service 3| s3| s5|
  7. | service 5| s5| s6|
  8. | service 4| s4| s3|
  9. | service 6| s6| Leaf|
  10. +-----------+-------+------+
  11. All vertices list
  12. +----+
  13. |id |
  14. +----+
  15. |s6 |
  16. |s5 |
  17. |s4 |
  18. |s2 |
  19. |s3 |
  20. |s1 |
  21. |Leaf|
  22. +----+
  23. +---+----+
  24. |src|dst |
  25. +---+----+
  26. |s1 |s2 |
  27. |s2 |s4 |
  28. |s3 |s5 |
  29. |s5 |s6 |
  30. |s4 |s3 |
  31. |s6 |Leaf|
  32. +---+----+
  33. All vertices
  34. +----+
  35. |id |
  36. +----+
  37. |s6 |
  38. |s5 |
  39. |s4 |
  40. |s2 |
  41. |s3 |
  42. |s1 |
  43. |Leaf|
  44. +----+
  45. All relationships
  46. +---+----+
  47. |src|dst |
  48. +---+----+
  49. |s1 |s2 |
  50. |s2 |s4 |
  51. |s3 |s5 |
  52. |s5 |s6 |
  53. |s4 |s3 |
  54. |s6 |Leaf|
  55. +---+----+
  56. +----+--------+----+--------+----+--------+----+--------+----+--------+----+----------+------+
  57. |from| e0| v1| e1| v2| e2| v3| e3| v4| e4| v5| e5| to|
  58. +----+--------+----+--------+----+--------+----+--------+----+--------+----+----------+------+
  59. |{s1}|{s1, s2}|{s2}|{s2, s4}|{s4}|{s4, s3}|{s3}|{s3, s5}|{s5}|{s5, s6}|{s6}|{s6, Leaf}|{Leaf}|
  60. +----+--------+----+--------+----+--------+----+--------+----+--------+----+----------+------+
  61. Connected components
  62. +----+---------+
  63. | id|component|
  64. +----+---------+
  65. | s6| 0|
  66. | s5| 0|
  67. | s4| 0|
  68. | s2| 0|
  69. | s3| 0|
  70. | s1| 0|
  71. |Leaf| 0|
  72. +----+---------+

展开查看全部

相关问题