sc.textFile(path,minpartions)
> @param path path to the text file on a supported file system
> @param minPartitions suggested minimum number of partitions for the resulting RDD
> @return RDD of lines of the text file
HadoopRDD(
sc, //Sparkcontext
confBroadcast, //A general Hadoop Configuration, or a subclass of it
Some(setInputPathsFunc),//Optional closure used to initialize any JobConf that HadoopRDD creates. inputFormatClass,
keyClass,
valueClass,
minPartitions)
在textfile方法中,我们调用create a hadooprdd,其中包含一些硬编码值:
HadoopRDD(
sc, //Sparkcontext
confBroadcast, //A general Hadoop Configuration, or a subclass of it
Some(setInputPathsFunc),//Optional closure used to initialize any JobConf that HadoopRDD creates.
classOf[TextInputFormat],
classOf[LongWritable],
classOf[Text],
minPartitions)
var reader: RecordReader[K, V] = null
val inputFormat = getInputFormat(jobConf)
HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(createTime),
context.stageId, theSplit.index, context.attemptNumber, jobConf)
reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener{ context => closeIfNeeded() }
val key: K = reader.createKey()
val value: V = reader.createValue()
3条答案
按热度按时间rwqw0loc1#
textFile
是一种org.apache.spark.SparkContext
类,该类从hdfs、本地文件系统(在所有节点上都可用)或任何支持hadoop的文件系统uri读取文本文件,并将其作为字符串的rdd返回。它在内部使用hadooprdd(提供读取hadoop中存储的数据的核心功能的rdd)
hadoop rdd看起来像这样
在textfile方法中,我们调用create a hadooprdd,其中包含一些硬编码值:
由于这些硬编码的值,我们只能读取文本文件,所以如果我们想读取任何其他类型的文件,我们使用hadooprdd。
nue99wik2#
apachespark使用hadoop客户机库读取文件。所以你得看报纸
hadoop-client
了解更多信息的源代码:https://github.com/apache/hadoop/blob/release-2.7.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/textinputformat.java
csga3l583#
core\src\main\scala\org\apache\spark\rdd\hadooprdd.scala中的compute函数
下面是函数中的一些代码