更改sparkcontext textfile方法

sczxawaw  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(273)

我在努力改变 textFile 方法返回多行字符串的rdd,而不是行字符串的rdd。我想在spark源代码中找到从磁盘读取文件内容的实现。

SparkConf sparkConf = new SparkConf().setAppName("MyJavaApp");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
JavaRDD<String> lines = ctx.textFile(args[0], 1);

但当我跟着 textFile 呼叫链,我刚够到 HadoopRDD 以及 RDD 班级。呼叫链如下:
在javasparkcontext.scala中

def textFile(path: String, minPartitions: Int): JavaRDD[String] =
sc.textFile(path, minPartitions)

在sparkcontext.scala中

def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
  minPartitions).map(pair => pair._2.toString).setName(path)}

以及

def hadoopFile[K, V](path: String, ...): RDD[(K, V)] = {    
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
new HadoopRDD(this,...).setName(path)  }

在hadooprdd.scala中

class HadoopRDD[K, V](
sc: SparkContext,
broadcastedConf: Broadcast[SerializableWritable[Configuration]],
initLocalJobConfFuncOpt: Option[JobConf => Unit],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int) extends RDD[(K, V)](sc, Nil) with Logging {...

我不想使用map函数(作为开销)从行的rdd生成我的定制rdd。
有什么帮助吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题