我在努力改变 textFile
方法返回多行字符串的rdd,而不是行字符串的rdd。我想在spark源代码中找到从磁盘读取文件内容的实现。
SparkConf sparkConf = new SparkConf().setAppName("MyJavaApp");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
JavaRDD<String> lines = ctx.textFile(args[0], 1);
但当我跟着 textFile
呼叫链,我刚够到 HadoopRDD
以及 RDD
班级。呼叫链如下:
在javasparkcontext.scala中
def textFile(path: String, minPartitions: Int): JavaRDD[String] =
sc.textFile(path, minPartitions)
在sparkcontext.scala中
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)}
以及
def hadoopFile[K, V](path: String, ...): RDD[(K, V)] = {
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
new HadoopRDD(this,...).setName(path) }
在hadooprdd.scala中
class HadoopRDD[K, V](
sc: SparkContext,
broadcastedConf: Broadcast[SerializableWritable[Configuration]],
initLocalJobConfFuncOpt: Option[JobConf => Unit],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int) extends RDD[(K, V)](sc, Nil) with Logging {...
我不想使用map函数(作为开销)从行的rdd生成我的定制rdd。
有什么帮助吗?
暂无答案!
目前还没有任何答案,快来回答吧!