我有一个问题,我正试图解决使用Spark。我是相当新的Spark,所以我不知道什么是最好的方式来设计它。
输入:
group1=user1,user2
group2=user1,user2,user3
group3=user2,user4
group4=user1,user4
group5=user3,user5
group6=user3,user4,user5
group7=user2,user4
group8=user1,user5
group9=user2,user4,user5
group10=user4,user5
我想找出每对用户之间的相互组数。所以对于上面的输入,我期望的输出是:
输出:
1st user || 2nd user || mutual/intersection count || union count
------------------------------------------------------------
user1 user2 2 7
user1 user3 1 6
user1 user4 1 9
user2 user4 3 8
我认为有几种方法可以解决这个问题,其中一种方法可以是:
创建一个key,value对,其中key是user,value是group
按键分组,然后我们将有一个用户所属组的列表
然后找到两组之间的交集/并集
例子:
(1st stage): Map
group1=user1,user2 ==>
user1, group1
user2, group1
group2=user1,user2,user3 ==>
user1, group2
user2, group2
user3, group2
....
....
....
(2nd stage): Reduce by key
user1 -> group1, group2, group4, group8
user2 -> group1, group2, group3, group7, group9
但我的问题是,在按键减少计数后,用我想要的方式表示计数的最佳方式是什么?
有没有更好的办法来处理这个问题?用户的最大数量是恒定的,不会超过5000,所以这是它将创建的最大密钥数。但是输入可能包含接近1b行的几行。我不认为那会是个问题,如果我错了,请纠正我。
更新:
这是我用我对spark的一点了解来解决这个问题的一段代码(上个月刚开始学习spark):
def createPair(line: String): Array[(String, String)] = {
val splits = line.split("=")
val kuid = splits(0)
splits(1).split(",").map { segment => (segment, kuid) }
}
val input = sc.textFile("input/test.log")
val pair = input.flatMap { line => createPair(line) }
val pairListDF = pair
.aggregateByKey(scala.collection.mutable.ListBuffer.empty[String])(
(kuidList, kuid) => { kuidList += kuid; kuidList },
(kuidList1, kuidList2) => { kuidList1.appendAll(kuidList2); kuidList1 })
.mapValues(_.toList).toDF().select($"_1".alias("user"), $"_2".alias("groups"))
pairListDF.registerTempTable("table")
sqlContext.udf.register("intersectCount", (list1: WrappedArray[String], list2: WrappedArray[String]) => list1.intersect(list2).size)
sqlContext.udf.register("unionCount", (list1: WrappedArray[String], list2: WrappedArray[String]) => list1.union(list2).distinct.size)
val populationDF = sqlContext.sql("SELECT t1.user AS user_first,"
+ "t2.user AS user_second,"
+ "intersectCount(t1.groups, t2.groups) AS intersect_count,"
+ "unionCount(t1.groups, t2.groups) AS union_count"
+ " FROM table t1 INNER JOIN table t2"
+ " ON t1.user < t2.user"
+ " ORDER BY user_first,user_second")
输出:
+----------+-----------+---------------+-----------+
|user_first|user_second|intersect_count|union_count|
+----------+-----------+---------------+-----------+
| user1| user2| 2| 7|
| user1| user3| 1| 6|
| user1| user4| 1| 9|
| user1| user5| 1| 8|
| user2| user3| 1| 7|
| user2| user4| 3| 8|
| user2| user5| 1| 9|
| user3| user4| 1| 8|
| user3| user5| 2| 6|
| user4| user5| 3| 8|
+----------+-----------+---------------+-----------+
希望得到一些关于我的代码和我丢失的东西的反馈。请随意批评我的代码,因为我刚刚开始学习Spark。再次感谢@axiom的回答,比我预期的要小得多,更好的解决方案。
1条答案
按热度按时间bfhwhh0e1#
总结:
获取配对计数,然后使用
并集(a,b)=计数(a)+计数(b)-交点(a,b)
细节:
总共有5000个用户,2500万个密钥(每对1个)应该不会太多。我们可以用
reduceByKey
计算交叉口计数。单个计数很容易
Broadcasted
在Map上。现在大家都知道:
Union(user1, user2) = count(user1) + count(user2) - Intersection(user1, user2)
.前两个计数从广播Map中读取,而我们Map成对计数的rdd。
代码:
最后:
注:
在生成对时,我对元组进行排序,因为我们不希望列表中用户的顺序有什么影响。
将用户名字符串编码为整数,可能会显著提高性能。