java—如何在spark rdd(javardd)中获取记录的文件名

nbewdwxp  于 2021-06-03  发布在  Hadoop
关注(0)|答案(3)|浏览(671)

我正在使用

JavaRDD<String> allLines = sc.textFile(hdfs://path/*.csv);

加载文件后,我修改每个记录并希望保存它们。不过,我还需要保存原始文件名(id)与记录,以备将来参考。我是否可以从rdd中的单个记录中获取原始文件名?谢谢

rnmwe5a2

rnmwe5a21#

你应该能够使用todebugstring。使用wholetextfile会将文件的整个内容作为一个元素读入,而sc.textfile会创建一个rdd,每一行都作为一个单独的元素-如这里所述。
例如:

val file= sc.textFile("/user/user01/whatever.txt").cache()

val wordcount = file.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)

wordcount.toDebugString

// res0: String =

// (2) ShuffledRDD[4] at reduceByKey at <console>:23 []

// +-(2) MapPartitionsRDD[3] at map at <console>:23 []

//    |  MapPartitionsRDD[2] at flatMap at <console>:23 []

//    |  /user/user01/whatever.txt MapPartitionsRDD[1] at textFile at <console>:21 []

//    |  /user/user01/whatever.txt HadoopRDD[0] at textFile at <console>:21 []
3zwtqj6y

3zwtqj6y2#

您可以尝试执行以下代码段中的操作:

JavaPairRDD<LongWritable, Text> javaPairRDD = sc.newAPIHadoopFile(
    "hdfs://path/*.csv", 
    TextInputFormat.class, 
    LongWritable.class, 
    Text.class, 
    new Configuration()
);
JavaNewHadoopRDD<LongWritable, Text> hadoopRDD = (JavaNewHadoopRDD) javaPairRDD;

JavaRDD<Tuple2<String, String>> namedLinesRDD = hadoopRDD.mapPartitionsWithInputSplit((inputSplit, lines) -> {
    FileSplit fileSplit = (FileSplit) inputSplit;
    String fileName = fileSplit.getPath().getName();

    Stream<Tuple2<String, String>> stream =
        StreamSupport.stream(Spliterators.spliteratorUnknownSize(lines, Spliterator.ORDERED), false)
            .map(line -> {
                String lineText = line._2().toString();
                // emit file name as key and line as a value
                return new Tuple2(fileName, lineText);
            });
    return stream.iterator();
}, true);

更新(适用于java7)

JavaRDD<Tuple2<String, String>> namedLinesRDD = hadoopRDD.mapPartitionsWithInputSplit(
    new Function2<InputSplit, Iterator<Tuple2<LongWritable, Text>>, Iterator<Tuple2<String, String>>>() {
        @Override
        public Iterator<Tuple2<String, String>> call(InputSplit inputSplit, final Iterator<Tuple2<LongWritable, Text>> lines) throws Exception {
            FileSplit fileSplit = (FileSplit) inputSplit;
            final String fileName = fileSplit.getPath().getName();
            return new Iterator<Tuple2<String, String>>() {
                @Override
                public boolean hasNext() {
                    return lines.hasNext();
                }
                @Override
                public Tuple2<String, String> next() {
                    Tuple2<LongWritable, Text> entry = lines.next();
                    return new Tuple2<String, String>(fileName, entry._2().toString());
                }
            };
        }
    }, 
    true
);
kmynzznz

kmynzznz3#

你想要spark的wholetextfiles函数。根据文件:

For example, if you have the following files:

   hdfs://a-hdfs-path/part-00000
   hdfs://a-hdfs-path/part-00001
   ...
   hdfs://a-hdfs-path/part-nnnnn

Do val rdd = sparkContext.wholeTextFile("hdfs://a-hdfs-path"),

then rdd contains

   (a-hdfs-path/part-00000, its content)
   (a-hdfs-path/part-00001, its content)
   ...
   (a-hdfs-path/part-nnnnn, its content)

它返回一个rdd元组,左边是文件名,右边是内容。

相关问题