为什么接收消息不会重新激活graphx中的顶点?

tv6aics1  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(215)

我试图用graphx计算一个图中来自一个源的边标签“comment”和“likedby”的数量。在第一步中,所有顶点都处于活动状态,然后我控制计算,并使用存储在每个顶点中的附加值发送消息,并使用消息进行更新。在第一次迭代中,只有相关的顶点计算并发送消息。与消息一起传播的第二个值是一个布尔值,它指示使用的边是用“comment”还是“likedby”标记的。

val fstId = ... // Considered as given
var nbComment, nbLike = 0L

sn.mapVertices((_, v) =>  (fstId, v)).pregel((fstId, false))(vprog, sendMsg, mergeMsg)

def vprog(id: Long, value: (Long, String), merged_msg: (Long, Boolean)) = {
  println("activate " + id)
  if (merged_msg != (fstId, false))
    println("received message: " + merged_msg )

  if (merged_msg._1 == id || merged_msg._1 < 0)
    if (merged_msg._2) nbLike += 1L
    else nbComment += 1L
  (merged_msg._1, value._2)
}

def sendMsg(triplet: EdgeTriplet[(Long, String), String]) = {
  if (triplet.srcId == fstId || triplet.srcAttr._1 == -1L)
    if (triplet.attr == "comment"){
      println((-1L, false) + " has been sent to " + triplet.dstId )
      Iterator((triplet.dstId, (-1L, false)))
    }
    else if (triplet.attr == "likedBy") {
      println((-1L, true) + " has been sent to " + triplet.dstId )
      Iterator((triplet.dstId, (-1L, true)))
    }
  Iterator.empty
}

def mergeMsg(m1: (Long, Boolean), m2: (Long, Boolean)) = m1

理论上,顶点每次迭代只能接收一条消息。这个 mergeMsg 函数的定义只是为了尊重 pregel 签名。
我的问题是:邮件是正确发送的,但从来没有收到收件人和我不知道为什么。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题