我正在努力改变 RDD(key,value)
至 RDD(key,iterable[value])
,与 groupByKey
方法。但作为 groupByKey
效率不高,我正在尝试使用 combineByKey
但是,在rdd上,它不起作用。使用的代码如下:
val data= List("abc,2017-10-04,15.2",
"abc,2017-10-03,19.67",
"abc,2017-10-02,19.8",
"xyz,2017-10-09,46.9",
"xyz,2017-10-08,48.4",
"xyz,2017-10-07,87.5",
"xyz,2017-10-04,83.03",
"xyz,2017-10-03,83.41",
"pqr,2017-09-30,18.18",
"pqr,2017-09-27,18.2",
"pqr,2017-09-26,19.2",
"pqr,2017-09-25,19.47",
"abc,2017-07-19,96.60",
"abc,2017-07-18,91.68",
"abc,2017-07-17,91.55")
val rdd = sc.parallelize(templines)
val rows = rdd.map(line => {
val row = line.split(",")
((row(0), row(1)), row(2))
})
// re partition and sort based key
val op = rows.repartitionAndSortWithinPartitions(new CustomPartitioner(4))
val temp = op.map(f => (f._1._1, (f._1._2, f._2)))
val mergeCombiners = (t1: (String, List[String]), t2: (String, List[String])) =>
(t1._1 + t2._1, t1._2.++(t2._2))
val mergeValue = (x: (String, List[String]), y: (String, String)) => {
val a = x._2.+:(y._2)
(x._1, a)
}
// createCombiner, mergeValue, mergeCombiners
val x = temp.combineByKey(
(t1: String, t2: String) => (t1, List(t2)),
mergeValue,
mergeCombiners)
``` `temp.combineByKey` 是给编译时错误,我不能得到它。
1条答案
按热度按时间xt0899hw1#
如果你想要一个类似于什么的输出
groupByKey
会给你的,那你绝对应该用groupByKey
而不是其他方法。这个reduceByKey
,combineByKey
等只比使用groupByKey
然后是一个聚合(给出与另一个相同的结果groupBy
方法可以给出)。因为想要的结果是
RDD[key,iterable[value]]
、自己建立列表或groupByKey
这样做会产生同样的工作量。没有必要重新实施groupByKey
你自己。问题在于groupByKey
不是它的实现,而是在分布式体系结构中。更多关于
groupByKey
对于这些类型的优化,我建议在这里多读一些。