pyspark RDD创建邻接表

gab6jxml  于 2023-11-16  发布在  Spark
关注(0)|答案(1)|浏览(114)

我试图创建与输入adj列表

RDD=["2\t{'3': 1}",
     "3\t{'2': 2}",
     "4\t{'1': 1, '2': 1}",
     "5\t{'4': 3, '2': 1, '6': 1}",
     "6\t{'2': 1, '5': 2}",
     "7\t{'2': 1, '5': 1}",
     "8\t{'2': 1, '5': 1}",
     "9\t{'2': 1, '5': 1}",
     "10\t{'5': 1}",
     "11\t{'5': 2}"]

字符串
expectation is adj list应该为任何悬空节点创建一个新记录,并将其设置为边(neighbors)是一个空的数组,每个节点的秩为1/N,我正在编写一个spark作业来读取原始数据,但我需要初始化一个邻接列表表示,每个节点都有一个记录(包括悬空节点)。返回:RDD -一对RDD(node_id,(score,edges))我还没有完全完成,但我正在尝试做以下事情。

adj_list = ()
ad = sc.broadcast(adj_list)
# write any helper functions here
def parse(line):
    node, edges = line.split('\t')
    print(f'node: {node} edges: {edges}')
    for key, value in ast.literal_eval(edges):
        yield (node, key, value)
    
RDD = dataRDD.flatMap(parse) \
             .reduceByKey(lambda x, y: (x[1]+y[1], x[0],y[0]))


1.我得到错误ValueError:没有足够的值来解包(预期2,得到1)当我做收集。我环顾四周,不知道发生了什么事。
1.我的方向正确吗?想法是将分数相加,然后在最后除以总节点数
1.顺便说一下,我需要返回rdd与预期的返回数据感谢指导我的逻辑。谢谢

carvr3hs

carvr3hs1#

这是类似的。

from pyspark.sql.types import *

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

from pyspark import SQLContext
from pyspark.sql.functions import *
import pyspark.sql.functions as F
import ast

sc = SparkContext('local')
sqlContext = SQLContext(sc)

RDD=["2\t{'3': 1}",
     "3\t{'2': 2}",
     "4\t{'1': 1, '2': 1}",
     "5\t{'4': 3, '2': 1, '6': 1}",
     "6\t{'2': 1, '5': 2}",
     "7\t{'2': 1, '5': 1}",
     "8\t{'2': 1, '5': 1}",
     "9\t{'2': 1, '5': 1}",
     "10\t{'5': 1}",
     "11\t{'5': 2}"]


columns =["raw_string"]

RDD = [Row(ele) for ele in RDD]
rdd1 = sc.parallelize(RDD)
df1 = sqlContext.createDataFrame(data=rdd1, schema=columns)

print("Given dataframe")
df1.show(n=20, truncate=False)

df2 = df1.withColumn('id', split(df1['raw_string'], '\t').getItem(0)) \
       .withColumn('mapping_string', split(df1['raw_string'], '\t').getItem(1)) \

print("df2 dataframe")
df2.show(n=20, truncate=False)

schema = MapType(StringType(),LongType())

def rowudf(mapping_str):
    print("mapping_str", mapping_str)
    print("type of mapping_str", type(mapping_str))
    given_dict = ast.literal_eval(mapping_str)
    print("converted dictionary", given_dict)
    print("keys here", given_dict.keys())
    print("values here", given_dict.values())
    print("type of dictionary", type(given_dict))
    return given_dict

udfValue = udf(rowudf, schema)
intermediate_df = df2.withColumn("mapping_dict", udfValue(F.col("mapping_string")))

print("intermediate_df dataframe")
intermediate_df.show(n=20, truncate=False)

字符串
输出量:

Given dataframe
+---------------------------+
|raw_string                 |
+---------------------------+
|2\t{'3': 1}                |
|3\t{'2': 2}                |
|4\t{'1': 1, '2': 1}        |
|5\t{'4': 3, '2': 1, '6': 1}|
|6\t{'2': 1, '5': 2}        |
|7\t{'2': 1, '5': 1}        |
|8\t{'2': 1, '5': 1}        |
|9\t{'2': 1, '5': 1}        |
|10\t{'5': 1}               |
|11\t{'5': 2}               |
+---------------------------+

df2 dataframe
+---------------------------+---+------------------------+
|raw_string                 |id |mapping_string          |
+---------------------------+---+------------------------+
|2\t{'3': 1}                |2  |{'3': 1}                |
|3\t{'2': 2}                |3  |{'2': 2}                |
|4\t{'1': 1, '2': 1}        |4  |{'1': 1, '2': 1}        |
|5\t{'4': 3, '2': 1, '6': 1}|5  |{'4': 3, '2': 1, '6': 1}|
|6\t{'2': 1, '5': 2}        |6  |{'2': 1, '5': 2}        |
|7\t{'2': 1, '5': 1}        |7  |{'2': 1, '5': 1}        |
|8\t{'2': 1, '5': 1}        |8  |{'2': 1, '5': 1}        |
|9\t{'2': 1, '5': 1}        |9  |{'2': 1, '5': 1}        |
|10\t{'5': 1}               |10 |{'5': 1}                |
|11\t{'5': 2}               |11 |{'5': 2}                |
+---------------------------+---+------------------------+

intermediate_df dataframe
+---------------------------+---+------------------------+------------------------+
|raw_string                 |id |mapping_string          |mapping_dict            |
+---------------------------+---+------------------------+------------------------+
|2\t{'3': 1}                |2  |{'3': 1}                |{3 -> 1}                |
|3\t{'2': 2}                |3  |{'2': 2}                |{2 -> 2}                |
|4\t{'1': 1, '2': 1}        |4  |{'1': 1, '2': 1}        |{1 -> 1, 2 -> 1}        |
|5\t{'4': 3, '2': 1, '6': 1}|5  |{'4': 3, '2': 1, '6': 1}|{2 -> 1, 4 -> 3, 6 -> 1}|
|6\t{'2': 1, '5': 2}        |6  |{'2': 1, '5': 2}        |{5 -> 2, 2 -> 1}        |
|7\t{'2': 1, '5': 1}        |7  |{'2': 1, '5': 1}        |{5 -> 1, 2 -> 1}        |
|8\t{'2': 1, '5': 1}        |8  |{'2': 1, '5': 1}        |{5 -> 1, 2 -> 1}        |
|9\t{'2': 1, '5': 1}        |9  |{'2': 1, '5': 1}        |{5 -> 1, 2 -> 1}        |
|10\t{'5': 1}               |10 |{'5': 1}                |{5 -> 1}                |
|11\t{'5': 2}               |11 |{'5': 2}                |{5 -> 2}                |
+---------------------------+---+------------------------+------------------------+

相关问题