spark with scala:通过对每个可能的表对执行函数来计算表

oalqel3c  于 2021-05-18  发布在  Spark
关注(0)|答案(1)|浏览(564)

我对scala/spark完全陌生,我正在尝试从头开始创建一个spark应用程序,该应用程序计算 n 整数集(回答这个问题不需要知道是什么)。
我有一个Dataframe,其中每一行都是一组整数,例如:

var sets = List(Set(1, 5, 7, 4), Set(3, 5, 0), Set(10, 1, 5)).toDF

和一个函数 jacsim(s1, s2) 返回两个集合之间的相似性。我想定义一个函数 sets dataframe返回另一个dataframe,该dataframe在位置(i,j)处包含 jacsim(sets(i), sets(j)) . 我该怎么做?
另外:将结果Dataframe用作表的想法是否愚蠢?我读到spark并不“喜欢”索引访问的行,因为这阻碍了并行性。我是否应该返回一个Dataframe,其中包含一行和每个可能的对作为一个新列?

lstz6jyr

lstz6jyr1#

正如您提到的,不允许使用索引访问sparkDataframe。下面是一个使用scala sparkDataframe的解决方案:

var sets = List(Set(1, 5, 7, 4), Set(3, 5, 0), Set(10, 1, 5)).toDF("sets")
    .withColumn("i",monotonically_increasing_id()) // to create indexes

val jaccardSimUDF = udf((set1: Seq[Int], set2: Seq[Int]) => set1.sum +  set2.sum) // dummy function, replace it with your implementation of Jaccard similarity

val resDF = sets.crossJoin(sets.withColumnRenamed("sets", "sets2").withColumnRenamed("i", "j"))
                .withColumn("jaccardSim", jaccardSimUDF($"sets", $"sets2"))

基本上,我们需要对Dataframe本身进行交叉连接,以获得所有的组合。然后我们可以应用“用户定义函数”(udf)来计算jaccard相似性。注意,为了方便起见,我创建了索引。
现在,如果你真的想有一个矩阵,你将需要重塑这个Dataframe,但这不是Spark的本质。
正如注解中所指出的,jaccard相似性函数是对称的,因此可以过滤不必要的索引,如下所示:

val resDF = sets.crossJoin(sets.withColumnRenamed("sets", "sets2").withColumnRenamed("i", "j"))
            .filter($"i" < $"j")
            .withColumn("jaccardSim", jaccardSimUDF($"sets", $"sets2"))

它可能看起来很难看,因为它仍然涉及一个完整的交叉连接,但由于spark依赖于懒惰的计算和CatalystOptimizer,它实际上并不是一个完整的交叉连接。所以我不认为有更好的解决办法。

相关问题