我使用spark来计算用户评论的pagerank,但是当我在一个大数据集(40k条目)上运行代码时,我总是得到Spark java.lang.StackOverflowError
。当在少量条目上运行代码时,它工作得很好。
条目示例:
product/productId: B00004CK40 review/userId: A39IIHQF18YGZA review/profileName: C. A. M. Salas review/helpfulness: 0/0 review/score: 4.0 review/time: 1175817600 review/summary: Reliable comedy review/text: Nice script, well acted comedy, and a young Nicolette Sheridan. Cusak is in top form.
字符串
代码:
public void calculatePageRank() {
sc.clearCallSite();
sc.clearJobGroup();
JavaRDD < String > rddFileData = sc.textFile(inputFileName).cache();
sc.setCheckpointDir("pagerankCheckpoint/");
JavaRDD < String > rddMovieData = rddFileData.map(new Function < String, String > () {
@Override
public String call(String arg0) throws Exception {
String[] data = arg0.split("\t");
String movieId = data[0].split(":")[1].trim();
String userId = data[1].split(":")[1].trim();
return movieId + "\t" + userId;
}
});
JavaPairRDD<String, Iterable<String>> rddPairReviewData = rddMovieData.mapToPair(new PairFunction < String, String, String > () {
@Override
public Tuple2 < String, String > call(String arg0) throws Exception {
String[] data = arg0.split("\t");
return new Tuple2 < String, String > (data[0], data[1]);
}
}).groupByKey().cache();
JavaRDD<Iterable<String>> cartUsers = rddPairReviewData.map(f -> f._2());
List<Iterable<String>> cartUsersList = cartUsers.collect();
JavaPairRDD<String,String> finalCartesian = null;
int iterCounter = 0;
for(Iterable<String> out : cartUsersList){
JavaRDD<String> currentUsersRDD = sc.parallelize(Lists.newArrayList(out));
if(finalCartesian==null){
finalCartesian = currentUsersRDD.cartesian(currentUsersRDD);
}
else{
finalCartesian = currentUsersRDD.cartesian(currentUsersRDD).union(finalCartesian);
if(iterCounter % 20 == 0) {
finalCartesian.checkpoint();
}
}
}
JavaRDD<Tuple2<String,String>> finalCartesianToTuple = finalCartesian.map(m -> new Tuple2<String,String>(m._1(),m._2()));
finalCartesianToTuple = finalCartesianToTuple.filter(x -> x._1().compareTo(x._2())!=0);
JavaPairRDD<String, String> userIdPairs = finalCartesianToTuple.mapToPair(m -> new Tuple2<String,String>(m._1(),m._2()));
JavaRDD<String> userIdPairsString = userIdPairs.map(new Function < Tuple2<String, String>, String > () {
//Tuple2<Tuple2<MovieId, userId>, Tuple2<movieId, userId>>
@Override
public String call (Tuple2<String, String> t) throws Exception {
return t._1 + " " + t._2;
}
});
try {
//calculate pagerank using this https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
JavaPageRank.calculatePageRank(userIdPairsString, 100);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
sc.close();
}
型
5条答案
按热度按时间rur96b6h1#
我有多个建议,这将有助于你大大提高你的问题中的代码的性能。
1.**Caching:**Caching应该用于那些需要反复引用以进行相同/不同操作(迭代算法)的数据集。
一个例子是RDD。
count
-告诉你文件的行数,文件需要被读取。所以如果你写RDD。count
,此时将读取文件,对行数进行计数,并返回计数。如果你打电话给RDD。又是
count
?同样的事情:则文件将被再次读取和计数。RDD是什么cache
怎么办?现在,如果你运行RDD。count
,文件将被加载,缓存和计数。如果你叫RDD。count
第二次,操作将使用该高速缓存。它只会从该高速缓存中获取数据并对行进行计数,而不会重新计算。阅读有关缓存here的更多信息。
在您的代码示例中,您没有重用已缓存的任何内容。所以你可以从那里删除
.cache
。1.**并行化:**在代码示例中,您已经并行化了RDD中的每个元素,RDD已经是一个分布式集合。我建议你合并
rddFileData
,rddMovieData
和rddPairReviewData
步骤,以便一次性完成。删除
.collect
,因为这会将结果带回驱动程序,可能是错误的实际原因。cld4siwp2#
当你的DAG变得很大,并且代码中发生了太多级别的转换时,就会出现这个问题。当一个动作最终执行时,JVM将无法保存操作以执行延迟执行。
检查点是一种选择。我建议为这种聚合实现spark-sql。如果你的数据是结构化的,试着将其加载到dataframes中,并执行分组和其他mysql函数来实现这一点。
inn6fuwd3#
当你的for循环变得非常大时,Spark就不能再跟踪继承了。在for循环中启用检查点,每10次迭代左右检查一次rdd。检查点将解决此问题。之后不要忘记清理检查点目录。
http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing
bwntbbo34#
下面修复了stackoverflow错误,正如其他人指出的那样,这是因为spark不断构建的血统,特别是当你在代码中有循环/迭代时。
设置检查点目录
字符串
在迭代中修改/操作的检查点 Dataframe /Rdd
型
缓存在每次迭代中重用的数据框架
型
zpjtge225#
添加此配置:
字符串