在rdd方法中访问hdfs输入分割路径

kr98yfug  于 2021-06-04  发布在  Hadoop
关注(0)|答案(1)|浏览(308)

我的hdfs文件路径包含我想在spark中访问的元数据,即以下内容:

sc.newAPIHadoopFile("hdfs://.../*"), ...)
  .map( rdd => /* access hdfs path here */ )

在hadoop中,我可以访问整个split-through的路径 FileSplit.getPath() . 在spark中我能做什么类似的事情吗,或者我必须将路径字符串附加到扩展表中的每个rdd元素吗 NewHadoopRDD ,我认为这可能相当昂贵?

iqih9akk

iqih9akk1#

在提供给map()方法的闭包中,没有可用的元数据/执行上下文信息。
你可能想要的是

mapPartitionsWithContext

Similar to mapPartitions, but allows accessing information about the processing state within the mapper

然后你可以做一些像

import org.apache.spark.TaskContext
def myfunc(tc: TaskContext, iter: Iterator[Int]) : Iterator[Int] = {
  tc.addOnCompleteCallback(() => println(
    "Partition: "     + tc.partitionId +
    ", AttemptID: "   + tc.attemptId   +
    ", Interrupted: " + tc.interrupted))

  iter.toList.filter(_ % 2 == 0).iterator
}
a.mapPartitionsWithContext(myfunc).collect

更新以前的解决方案不提供hdfs文件名。您可能需要执行以下操作:
创建扩展fileinputformat的自定义inputformat
创建一个定制的recordreader,为每一行输出与inputsplit关联的文件,然后输出每一行的实际值
在sparkMap器中,您将解析出现在包含hdfs文件名的第一个字段,Map器的其余部分保持不变

相关问题