java中读取reducebykey()方法中的文件

bvhaajcl  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(278)

我正在开发一个spark应用程序,它通过向边添加相邻顶点来扩展边。我正在使用map/reduce范例来处理这个过程,在这个过程中,我要划分边的总数,并在不同的工作节点中展开它们。
为此,我需要根据键值读取工作节点中的分区相邻列表。但是我在尝试在reducebykey()方法中加载文件时出错。它表示任务不可序列化。我的代码:

public class MyClass implements Serializable{
    public static void main(String args[]) throws IOException {     
       SparkConf conf = new SparkConf().setAppName("startingSpark").setMaster("local[*]");
       JavaSparkContext sc = new JavaSparkContext(conf);               
       JavaRDD<String> file = sc.textFile("hdfs://localhost:9000/mainFile.txt");
       ... ... ... //Mapping done successfully
       JavaPairRDD<String, String> rdd1 = pairs.reduceByKey(new Function2<String, String, String>() {
       @Override
       public String call(String v1, String v2) throws Exception {
          ... ... ...
          JavaRDD <String> adj = sc.textFile("hdfs://localhost:9000/adjacencyList_"+key+"txt");
          //Here I to expand the edges after reading the adjacency list.
        }
    }

但我得到一个错误任务不能序列化。原因:java.io.notserializableexception:org.apache.spark.api.java.javasparkcontext序列化堆栈:-对象不可序列化。我认为这是因为我在worker节点中使用的spark上下文与在driver程序中使用的spark上下文相同。如果我试图在reducebykey()方法中创建一个新的spark上下文,它也会给我一个错误,告诉我只有一个sparkcontext应该在这个jvm中运行。
有人能告诉我如何读取reducebykey()方法中的文件吗?有没有其他方法来完成我的任务?我希望扩展工作节点中的边,以便它们可以以分布式方式运行。
提前谢谢。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题