我正在开发一个spark应用程序,它通过向边添加相邻顶点来扩展边。我正在使用map/reduce范例来处理这个过程,在这个过程中,我要划分边的总数,并在不同的工作节点中展开它们。
为此,我需要根据键值读取工作节点中的分区相邻列表。但是我在尝试在reducebykey()方法中加载文件时出错。它表示任务不可序列化。我的代码:
public class MyClass implements Serializable{
public static void main(String args[]) throws IOException {
SparkConf conf = new SparkConf().setAppName("startingSpark").setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> file = sc.textFile("hdfs://localhost:9000/mainFile.txt");
... ... ... //Mapping done successfully
JavaPairRDD<String, String> rdd1 = pairs.reduceByKey(new Function2<String, String, String>() {
@Override
public String call(String v1, String v2) throws Exception {
... ... ...
JavaRDD <String> adj = sc.textFile("hdfs://localhost:9000/adjacencyList_"+key+"txt");
//Here I to expand the edges after reading the adjacency list.
}
}
但我得到一个错误任务不能序列化。原因:java.io.notserializableexception:org.apache.spark.api.java.javasparkcontext序列化堆栈:-对象不可序列化。我认为这是因为我在worker节点中使用的spark上下文与在driver程序中使用的spark上下文相同。如果我试图在reducebykey()方法中创建一个新的spark上下文,它也会给我一个错误,告诉我只有一个sparkcontext应该在这个jvm中运行。
有人能告诉我如何读取reducebykey()方法中的文件吗?有没有其他方法来完成我的任务?我希望扩展工作节点中的边,以便它们可以以分布式方式运行。
提前谢谢。
暂无答案!
目前还没有任何答案,快来回答吧!