spark sc.textfile是如何详细工作的?

von4xj4u  于 2021-06-02  发布在  Hadoop
关注(0)|答案(3)|浏览(835)

我想知道怎么做 sc.textfile 工作细节。
我在sparkcontext.scala中找到了textfile源代码,但是它们包含了太多关于调度程序、阶段和任务提交的信息。我想要的是sc.textfile如何从hdfs读取文件,以及sc.textfile如何使用通配符匹配多个文件。
在哪里可以找到源代码?

rwqw0loc

rwqw0loc1#

textFile 是一种 org.apache.spark.SparkContext 类,该类从hdfs、本地文件系统(在所有节点上都可用)或任何支持hadoop的文件系统uri读取文本文件,并将其作为字符串的rdd返回。

sc.textFile(path,minpartions)

> @param path path to the text file on a supported file system  
> @param minPartitions suggested minimum number of partitions for the resulting RDD
> @return RDD of lines of the text file

它在内部使用hadooprdd(提供读取hadoop中存储的数据的核心功能的rdd)
hadoop rdd看起来像这样

HadoopRDD(
      sc, //Sparkcontext
      confBroadcast, //A general Hadoop Configuration, or a subclass of it
      Some(setInputPathsFunc),//Optional closure used to initialize any JobConf that HadoopRDD creates.       inputFormatClass,
      keyClass,
      valueClass,
      minPartitions)

在textfile方法中,我们调用create a hadooprdd,其中包含一些硬编码值:

HadoopRDD(
      sc, //Sparkcontext
      confBroadcast, //A general Hadoop Configuration, or a subclass of it
      Some(setInputPathsFunc),//Optional closure used to initialize any JobConf that HadoopRDD creates. 
      classOf[TextInputFormat],
      classOf[LongWritable],
      classOf[Text],
      minPartitions)

由于这些硬编码的值,我们只能读取文本文件,所以如果我们想读取任何其他类型的文件,我们使用hadooprdd。

nue99wik

nue99wik2#

apachespark使用hadoop客户机库读取文件。所以你得看报纸 hadoop-client 了解更多信息的源代码:
https://github.com/apache/hadoop/blob/release-2.7.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/textinputformat.java

csga3l58

csga3l583#

core\src\main\scala\org\apache\spark\rdd\hadooprdd.scala中的compute函数
下面是函数中的一些代码

var reader: RecordReader[K, V] = null
  val inputFormat = getInputFormat(jobConf)
  HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(createTime),
   context.stageId, theSplit.index, context.attemptNumber, jobConf)
  reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)

  // Register an on-task-completion callback to close the input stream.
  context.addTaskCompletionListener{ context => closeIfNeeded() }
  val key: K = reader.createKey()
  val value: V = reader.createValue()

相关问题