我试图实现一个简单的消息传递系统,以了解工作原理。我的目标是将每个顶点的名称发送给父节点。所需的实现是将先前实现创建的“iterationmsg”的现有数组合并到具有这些值的新数组中。我已经使用aggregatemessages函数实现了这一点,它使用了与信念传播相同的逻辑,但是我愿意使用PregelAPI来实现这一点。如何在“iterationmsg中的现有数组”处引用“iterationmsg”中的数组?
v = sqlc.createDataFrame([
(0, "Anna", 24),
(1, "Bob", 26),
(2, "Charlie", 27),
(3, "David", 27),
(4, "Eric", 26),
], ["id", "name", "age"])
# Edge DataFrame
e = sqlc.createDataFrame([
(0, 0, 1, "friend"),
(1, 1, 2, "friend"),
(2, 2, 1, "friend"),
(3, 1, 1, "loves"),
(4, 2, 0, "pays"),
(5, 0, 3, "boss"),
(6, 3, 4, "boss"),
(7, 0, 4, "boss")
], ["id", "src", "dst", "label"])
g = GraphFrame(v.withColumn('state', f.lit(True)).withColumn('iterationMsg', f.array()), e)
NR_ITER = 2
msgpassing = g.pregel \
.setMaxIter(NR_ITER) \
.withVertexColumn("iterationMsg", f.array(),
f.array_union('existing array in iterationMsg', Pregel.msg())) \
.sendMsgToSrc(f.when(Pregel.dst('state'),
f.array_union(f.array(Pregel.dst('name')), Pregel.dst('iterationMsg')))) \
.aggMsgs(f.array(Pregel.msg())) \
.run()
暂无答案!
目前还没有任何答案,快来回答吧!