python—使用pyspark从关系数据集构建层次结构

wgx48brx  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(528)

我是python新手,一直致力于从关系数据集构建层次结构。
如果有人对如何进行这项工作有想法,那将大有帮助。
我有一个关系数据集

_currentnode,  childnode_  
 root,         child1  
 child1,       leaf2  
 child1,       child3  
 child1,       leaf4  
 child3,       leaf5  
 child3,       leaf6

以此类推。我正在寻找一些python或pyspark代码
构建一个层次数据框架,如下所示

_level1, level2,  level3,  level4_  
root,    child1,  leaf2,   null  
root,    child1,  child3,  leaf5  
root,    child1,  child3,  leaf6  
root,    child1,  leaf4,   null

数据是字母数字,是一个巨大的数据集[~50mil记录]。
此外,层次结构的根是已知的,可以在代码中硬连接。
因此,在上面的示例中,层次结构的根是“root”。

lb3vh1jj

lb3vh1jj1#

pyspark最短路径

输入数据可以解释为一个图形,其中 currentnode 以及 childnode . 然后问题是根节点和所有叶节点之间的最短路径是什么,称为单源最短路径。
spark有graphx来处理图形的并行计算。不幸的是,graphx没有提供pythonapi(更多细节可以在这里找到)。graphframes是一个支持python的图形库。graphframes使用graphx的一部分。
graphx和graphframes都为sssp提供了解决方案。不幸的是,两个实现都只返回最短路径的长度,而不是路径本身(graphx和graphframes)。但是这个答案提供了graphx和scala算法的实现,该算法也返回路径。所有三种解决方案都使用pregel。
将上述答案转换为graphframes/python:

1. 数据准备

为所有节点提供唯一的ID,并更改列名,使其适合此处描述的名称

df = ...

vertices = df.select("currentnode").withColumnRenamed("currentnode", "node").union(df.select("childnode")).distinct().withColumn("id", F.monotonically_increasing_id()).cache()

edges = df.join(vertices, df.currentnode == vertices.node).drop(F.col("node")).withColumnRenamed("id", "src")\
        .join(vertices, df.childnode== vertices.node).drop(F.col("node")).withColumnRenamed("id", "dst").cache()
Nodes                   Edges
+------+------------+   +-----------+---------+------------+------------+
|  node|          id|   |currentnode|childnode|         src|         dst|
+------+------------+   +-----------+---------+------------+------------+
| leaf2| 17179869184|   |     child1|    leaf4| 25769803776|249108103168|
|child1| 25769803776|   |     child1|   child3| 25769803776| 68719476736|
|child3| 68719476736|   |     child1|    leaf2| 25769803776| 17179869184|
| leaf6|103079215104|   |     child3|    leaf6| 68719476736|103079215104|
|  root|171798691840|   |     child3|    leaf5| 68719476736|214748364800|
| leaf5|214748364800|   |       root|   child1|171798691840| 25769803776|
| leaf4|249108103168|   +-----------+---------+------------+------------+
+------+------------+

2. 创建graphframe

from graphframes import GraphFrame
graph = GraphFrame(vertices, edges)

3. 创建将构成pregel算法单个部分的udf

消息类型:

from pyspark.sql.types import *
vertColSchema = StructType()\
      .add("dist", DoubleType())\
      .add("node", StringType())\
      .add("path", ArrayType(StringType(), True))

顶点程序:

def vertexProgram(vd, msg):
    if msg == None or vd.__getitem__(0) < msg.__getitem__(0):
        return (vd.__getitem__(0), vd.__getitem__(1), vd.__getitem__(2))
    else:
        return (msg.__getitem__(0), vd.__getitem__(1), msg.__getitem__(2))
vertexProgramUdf = F.udf(vertexProgram, vertColSchema)

传出消息:

def sendMsgToDst(src, dst):
    srcDist = src.__getitem__(0)
    dstDist = dst.__getitem__(0)
    if srcDist < (dstDist - 1):
        return (srcDist + 1, src.__getitem__(1), src.__getitem__(2) + [dst.__getitem__(1)])
    else:
        return None
sendMsgToDstUdf = F.udf(sendMsgToDst, vertColSchema)

消息聚合:

def aggMsgs(agg):
    shortest_dist = sorted(agg, key=lambda tup: tup[1])[0]
    return (shortest_dist.__getitem__(0), shortest_dist.__getitem__(1), shortest_dist.__getitem__(2))
aggMsgsUdf = F.udf(aggMsgs, vertColSchema)

4. 组合部件

from graphframes.lib import Pregel
result = graph.pregel.withVertexColumn(colName = "vertCol", \
    initialExpr = F.when(F.col("node")==(F.lit("root")), F.struct(F.lit(0.0), F.col("node"), F.array(F.col("node")))) \
    .otherwise(F.struct(F.lit(float("inf")), F.col("node"), F.array(F.lit("")))).cast(vertColSchema), \
    updateAfterAggMsgsExpr = vertexProgramUdf(F.col("vertCol"), Pregel.msg())) \
    .sendMsgToDst(sendMsgToDstUdf(F.col("src.vertCol"), Pregel.dst("vertCol"))) \
    .aggMsgs(aggMsgsUdf(F.collect_list(Pregel.msg()))) \
    .setMaxIter(10) \
    .setCheckpointInterval(2) \
    .run()
result.select("vertCol.path").show(truncate=False)

评论: maxIter 应设置为至少与最长路径一样大的值。如果该值较高,结果将保持不变,但计算时间会变长。如果该值太小,则结果中将缺少较长的路径。当前版本的graphframes(0.8.0)不支持在不再发送新消息时停止循环。 checkpointInterval 应设置为小于 maxIter . 实际值取决于数据和可用硬件。当发生outofmemory异常或spark会话挂起一段时间时,该值可能会减小。
最终的结果是一个包含内容的规则Dataframe

+-----------------------------+
|path                         |
+-----------------------------+
|[root, child1]               |
|[root, child1, leaf4]        |
|[root, child1, child3]       |
|[root]                       |
|[root, child1, child3, leaf6]|
|[root, child1, child3, leaf5]|
|[root, child1, leaf2]        |
+-----------------------------+

如果需要,可以在这里过滤出非叶节点。

相关问题