我试图创建与输入adj列表
RDD=["2\t{'3': 1}",
"3\t{'2': 2}",
"4\t{'1': 1, '2': 1}",
"5\t{'4': 3, '2': 1, '6': 1}",
"6\t{'2': 1, '5': 2}",
"7\t{'2': 1, '5': 1}",
"8\t{'2': 1, '5': 1}",
"9\t{'2': 1, '5': 1}",
"10\t{'5': 1}",
"11\t{'5': 2}"]
字符串
expectation is adj list应该为任何悬空节点创建一个新记录,并将其设置为边(neighbors)是一个空的数组,每个节点的秩为1/N,我正在编写一个spark作业来读取原始数据,但我需要初始化一个邻接列表表示,每个节点都有一个记录(包括悬空节点)。返回:RDD -一对RDD(node_id,(score,edges))我还没有完全完成,但我正在尝试做以下事情。
adj_list = ()
ad = sc.broadcast(adj_list)
# write any helper functions here
def parse(line):
node, edges = line.split('\t')
print(f'node: {node} edges: {edges}')
for key, value in ast.literal_eval(edges):
yield (node, key, value)
RDD = dataRDD.flatMap(parse) \
.reduceByKey(lambda x, y: (x[1]+y[1], x[0],y[0]))
型
1.我得到错误ValueError:没有足够的值来解包(预期2,得到1)当我做收集。我环顾四周,不知道发生了什么事。
1.我的方向正确吗?想法是将分数相加,然后在最后除以总节点数
1.顺便说一下,我需要返回rdd与预期的返回数据感谢指导我的逻辑。谢谢
1条答案
按热度按时间carvr3hs1#
这是类似的。
字符串
输出量:
型