这里是一个Spark Graphframes df表示一个有向图,这个图中可能有一些圈。如何检测Graphframe中的循环?例如,这里有一个图表
| src | dst || --- | --- || 1 | 2 || 2 | 3 || 3 | 4 || 3 | 1 || 4 | 1 |
| src | dst |
| --- | --- |
| 1 | 2 |
| 2 | 3 |
| 3 | 4 |
| 3 | 1 |
| 4 | 1 |
字符串这个图中的循环应该是{1,2,3}和{1,2,3,4}。
flseospp1#
你可以使用BFS算法来寻找图中的圈
yqkkidmi2#
我对“Spark图文框”一无所知希望您会发现它很有用,下面是如何修改BFS来查找周期:在标准的BFS算法中,代码会跟踪哪些顶点以前被访问过。当搜索当前顶点的可达邻居时,跳过先前访问过的顶点。在为寻找循环而修改的BFS中,遇到先前访问过的顶点可能会发生,因为存在循环。为了检查这一点,应用Dijsktra来查找从当前顶点开始,经过图的其余部分,并返回到先前访问过的顶点的最短路径。如果这样的路径存在,那么它就是一个循环。下面是一个例子
的数据这是算法的基本要素,但需要处理一些重要的细节:
也许一个实现这一点的C++代码的链接会有帮助?
7y4bm7vi3#
虽然GraphFrames本身可能无法完全为您的任务提供开箱即用的必要功能,但将其与NetworkX和PandasUDF结合使用被证明是一种有效的解决方案。首先,让我们来研究一下NetworkX的功能,尤其是与您的示例相关的功能。示例图的绘制:
的数据Json的算法是NetworkX中simple_cycles函数的基础,与其他基于DFS修改的算法(source)相比,它具有更好的时间复杂度。在NetworkX中查找循环的代码:
import pandas as pdimport networkx as nxdf_edges = pd.DataFrame({ 'src': [1, 2, 3, 3, 4], 'dst': [2, 3, 4, 1, 1]})# Create a directed graph from the dataframeG = nx.from_pandas_edgelist(df_edges, source='src', target='dst', create_using=nx.DiGraph())# Find cyclescycles = list(nx.simple_cycles(G))print(cycles) #output: [[1, 2, 3], [1, 2, 3, 4]]
import pandas as pd
import networkx as nx
df_edges = pd.DataFrame({
'src': [1, 2, 3, 3, 4],
'dst': [2, 3, 4, 1, 1]
})
# Create a directed graph from the dataframe
G = nx.from_pandas_edgelist(df_edges, source='src', target='dst', create_using=nx.DiGraph())
# Find cycles
cycles = list(nx.simple_cycles(G))
print(cycles) #output: [[1, 2, 3], [1, 2, 3, 4]]
字符串NetworkX函数simple_cycles显然提供了所需的功能。然而,考虑到潜在的可扩展性问题和在Spark生态系统中运行的需求,寻求以并行方式运行的解决方案是有益的。这就是PandasUDF(矢量化UDF)实用程序的亮点所在。为了制定一个可扩展和可推广的解决方案,我们的第一步是执行连接组件操作。GraphFrames方便地提供了这种功能,如下所示:
from graphframes import *g = GraphFrame(df_edges) result = g.connectedComponents()
from graphframes import *
g = GraphFrame(df_edges)
result = g.connectedComponents()
型从connected components函数获得输出后(通常为[node,component id]格式),您可以使用此component id扩展原始的edge DataFrame。这将导致Spark DataFrame结构化为[src,dst,component]。为了简洁起见,我将在示例的后续步骤中手动生成这样一个Spark DataFrame。为了说明循环查找函数在不同连接组件上的并行化能力,我还将一个附加子图的边合并到边列表中。假设这是扩展的边缘列表
df_edges = pd.DataFrame({ 'src': [1, 2, 3, 3, 4,5,6,7], 'dst': [2, 3, 4, 1, 1,6,7,5], 'component' : [1,1,1,1,1,2,2,2]})from pyspark.sql import SparkSessionspark = SparkSession.builder.getOrCreate()# Convert pandas example DataFrame to Spark DataFrame# this is in place of the processed output # derived from both the original DataFrame # and the connected components output.spark_df_edges = spark.createDataFrame(df_edges)
'src': [1, 2, 3, 3, 4,5,6,7],
'dst': [2, 3, 4, 1, 1,6,7,5],
'component' : [1,1,1,1,1,2,2,2]
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# Convert pandas example DataFrame to Spark DataFrame
# this is in place of the processed output
# derived from both the original DataFrame
# and the connected components output.
spark_df_edges = spark.createDataFrame(df_edges)
型以下是扩展图的可视化,由两个不同的连接组件组成:
的这就是使用.show()时扩展的边列表(现在与组件ID集成)的显示方式
+---+---+---------+|src|dst|component|+---+---+---------+| 1| 2| 1|| 2| 3| 1|| 3| 4| 1|| 3| 1| 1|| 4| 1| 1|| 5| 6| 2|| 6| 7| 2|| 7| 5| 2|+---+---+---------+
+---+---+---------+
|src|dst|component|
| 1| 2| 1|
| 2| 3| 1|
| 3| 4| 1|
| 3| 1| 1|
| 4| 1| 1|
| 5| 6| 2|
| 6| 7| 2|
| 7| 5| 2|
型接下来,我们定义一个Pandas UDF,它可以应用于每组连接的组件。除了查找循环之外,该函数还旨在返回有用的信息,例如找到的循环计数和构成每个组件的每个循环的边列表:
from pyspark.sql.functions import pandas_udf, PandasUDFTypefrom pyspark.sql.types import StructType, StructField, IntegerType,StringTypeimport jsonschema = StructType([ StructField('component', IntegerType()), StructField('no_of_cycles', IntegerType()), StructField('cyclelist', StringType())])@pandas_udf(schema, PandasUDFType.GROUPED_MAP)def find_cycles(pdf): G = nx.from_pandas_edgelist(pdf, source='src', target='dst', create_using=nx.DiGraph()) cycles = list(nx.simple_cycles(G)) cyclelist = json.dumps(cycles) num_cycles = len(cycles) return pd.DataFrame({'component': [pdf['component'].iloc[0]], 'no_of_cycles': [num_cycles], 'cyclelist': [cyclelist]})
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructType, StructField, IntegerType,StringType
import json
schema = StructType([
StructField('component', IntegerType()),
StructField('no_of_cycles', IntegerType()),
StructField('cyclelist', StringType())
])
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def find_cycles(pdf):
G = nx.from_pandas_edgelist(pdf, source='src', target='dst', create_using=nx.DiGraph())
cyclelist = json.dumps(cycles)
num_cycles = len(cycles)
return pd.DataFrame({'component': [pdf['component'].iloc[0]],
'no_of_cycles': [num_cycles],
'cyclelist': [cyclelist]})
型现在定义了Pandas UDF,我们继续将此函数应用于每个单独的连接组件,如下所示:
cycles=spark_df_edges.groupby('component').apply(find_cycles).show(truncate=False)
型结果cycles dataframe看起来像这样:
+---------+------------+-------------------------+|component|no_of_cycles|cyclelist |+---------+------------+-------------------------+|1 |2 |[[1, 2, 3], [1, 2, 3, 4]]||2 |1 |[[5, 6, 7]] |+---------+------------+-------------------------+
+---------+------------+-------------------------+
|component|no_of_cycles|cyclelist |
|1 |2 |[[1, 2, 3], [1, 2, 3, 4]]|
|2 |1 |[[5, 6, 7]] |
型最后,我们可以连接两个DataFrame:
from pyspark.sql.functions import broadcastjoined_df = spark_df_edges.join(broadcast(cycles), on='component', how='inner')joined_df.show(truncate=False)
from pyspark.sql.functions import broadcast
joined_df = spark_df_edges.join(broadcast(cycles), on='component', how='inner')
joined_df.show(truncate=False)
型结果是
+---------+---+---+------------+-------------------------+|component|src|dst|no_of_cycles|cyclelist |+---------+---+---+------------+-------------------------+|1 |1 |2 |2 |[[1, 2, 3], [1, 2, 3, 4]]||1 |2 |3 |2 |[[1, 2, 3], [1, 2, 3, 4]]||1 |3 |4 |2 |[[1, 2, 3], [1, 2, 3, 4]]||1 |3 |1 |2 |[[1, 2, 3], [1, 2, 3, 4]]||1 |4 |1 |2 |[[1, 2, 3], [1, 2, 3, 4]]||2 |5 |6 |1 |[[5, 6, 7]] ||2 |6 |7 |1 |[[5, 6, 7]] ||2 |7 |5 |1 |[[5, 6, 7]] |+---------+---+---+------------+-------------------------+
+---------+---+---+------------+-------------------------+
|component|src|dst|no_of_cycles|cyclelist |
|1 |1 |2 |2 |[[1, 2, 3], [1, 2, 3, 4]]|
|1 |2 |3 |2 |[[1, 2, 3], [1, 2, 3, 4]]|
|1 |3 |4 |2 |[[1, 2, 3], [1, 2, 3, 4]]|
|1 |3 |1 |2 |[[1, 2, 3], [1, 2, 3, 4]]|
|1 |4 |1 |2 |[[1, 2, 3], [1, 2, 3, 4]]|
|2 |5 |6 |1 |[[5, 6, 7]] |
|2 |6 |7 |1 |[[5, 6, 7]] |
|2 |7 |5 |1 |[[5, 6, 7]] |
型注意,我们可以在这里使用broadcast,因为循环 Dataframe 中的行数是连接组件的数量,通常比边缘列表中的行数小得多。broadcast函数告诉Spark将较小的DataFrame广播到所有工作节点,如果一个DataFrame比另一个小得多,这可以加快join操作。
broadcast
3条答案
按热度按时间flseospp1#
你可以使用BFS算法来寻找图中的圈
yqkkidmi2#
我对“Spark图文框”一无所知
希望您会发现它很有用,下面是如何修改BFS来查找周期:
在标准的BFS算法中,代码会跟踪哪些顶点以前被访问过。当搜索当前顶点的可达邻居时,跳过先前访问过的顶点。
在为寻找循环而修改的BFS中,遇到先前访问过的顶点可能会发生,因为存在循环。
为了检查这一点,应用Dijsktra来查找从当前顶点开始,经过图的其余部分,并返回到先前访问过的顶点的最短路径。如果这样的路径存在,那么它就是一个循环。
下面是一个例子
的数据
这是算法的基本要素,但需要处理一些重要的细节:
也许一个实现这一点的C++代码的链接会有帮助?
7y4bm7vi3#
虽然GraphFrames本身可能无法完全为您的任务提供开箱即用的必要功能,但将其与NetworkX和PandasUDF结合使用被证明是一种有效的解决方案。首先,让我们来研究一下NetworkX的功能,尤其是与您的示例相关的功能。
示例图的绘制:
的数据
Json的算法是NetworkX中simple_cycles函数的基础,与其他基于DFS修改的算法(source)相比,它具有更好的时间复杂度。
在NetworkX中查找循环的代码:
字符串
NetworkX函数simple_cycles显然提供了所需的功能。然而,考虑到潜在的可扩展性问题和在Spark生态系统中运行的需求,寻求以并行方式运行的解决方案是有益的。这就是PandasUDF(矢量化UDF)实用程序的亮点所在。为了制定一个可扩展和可推广的解决方案,我们的第一步是执行连接组件操作。GraphFrames方便地提供了这种功能,如下所示:
型
从connected components函数获得输出后(通常为[node,component id]格式),您可以使用此component id扩展原始的edge DataFrame。这将导致Spark DataFrame结构化为[src,dst,component]。
为了简洁起见,我将在示例的后续步骤中手动生成这样一个Spark DataFrame。为了说明循环查找函数在不同连接组件上的并行化能力,我还将一个附加子图的边合并到边列表中。
假设这是扩展的边缘列表
型
以下是扩展图的可视化,由两个不同的连接组件组成:
的
这就是使用.show()时扩展的边列表(现在与组件ID集成)的显示方式
型
接下来,我们定义一个Pandas UDF,它可以应用于每组连接的组件。除了查找循环之外,该函数还旨在返回有用的信息,例如找到的循环计数和构成每个组件的每个循环的边列表:
型
现在定义了Pandas UDF,我们继续将此函数应用于每个单独的连接组件,如下所示:
型
结果cycles dataframe看起来像这样:
型
最后,我们可以连接两个DataFrame:
型
结果是
型
注意,我们可以在这里使用
broadcast
,因为循环 Dataframe 中的行数是连接组件的数量,通常比边缘列表中的行数小得多。broadcast函数告诉Spark将较小的DataFrame广播到所有工作节点,如果一个DataFrame比另一个小得多,这可以加快join操作。