使用apachespark在hdfs中存储多个文件

piztneat  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(402)

我正在做一个项目,涉及使用hdfs存储和apachespark计算。我在hdfs中有一个目录,其中有几个文本文件在同一深度,我想使用spark处理所有这些文件,并将它们相应的结果存储回hdfs,每个输入文件有一个输出文件。
例如,假设我有一个目录,其中有1000个相同深度的文本文件。我正在使用通配符读取所有这些文件

sc.wholeTextFiles(hdfs://localhost:9000/home/akshat/files/*.txt)

然后我使用spark处理它们,得到相应的rdd,并使用

result.saveAsTextFile("hdfs://localhost:9000/home/akshat/final")

但它给出了一个文件中所有输入文件的结果,我想得到每个文件,分别处理它们,并分别存储每个文件的输出。
我的下一个方法是什么?
提前谢谢!

06odsfpq

06odsfpq1#

您可以使用wholetextfiles()实现这一点,注意:下面的方法逐个处理文件。

val data = sc.wholeTextFiles("hdfs://master:port/vijay/mywordcount/")

val files = data.map { case (filename, content) => filename}

def doSomething(file: String) = { 

 println (file);

 // your logic of processing a single file comes here

 val logData = sc.textFile(file);
 val numAs = logData.filter(line => line.contains("a")).count();
 println("Lines with a: %s".format(numAs));

 // save rdd of single file processed data to hdfs  comes here

}

files.collect.foreach( filename => {
    doSomething(filename)

})

哪里:
hdfs://master:port/vijay/mywordcount/--您的hdfs目录
data-org.apache.spark.rdd.rdd[(字符串,字符串)]
files-org.apache.spark.rdd.rdd[string]-文件名
dosomething(文件名)-您的逻辑
更新:多个输出文件

/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

/* hadoop */

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat

/* java */
import java.io.Serializable;

import org.apache.log4j.Logger
import org.apache.log4j.Level

/* Custom TextOutput Format */
class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
  override def generateActualKey(key: Any, value: Any): Any =
    NullWritable.get()

  override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String =
    return key.asInstanceOf[String] +"-"+ name;   // for output hdfs://Ouptut_dir/inputFilename-part-****
  //return key.asInstanceOf[String] +"/"+ name;   // for output hdfs://Ouptut_dir/inputFilename/part-****[inputFilename - as directory of its partFiles ]
}

/* Spark Context */
object Spark {
  val sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}

/* WordCount Processing */

object Process extends Serializable{
  def apply(filename: String): org.apache.spark.rdd.RDD[(String, String)]= {
    println("i am called.....")
    val simple_path = filename.split('/').last;
    val lines = Spark.sc.textFile(filename);
    val counts     = lines.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _); //(word,count)
    val fname_word_counts = counts.map( x => (simple_path,x._1+"\t"+ x._2));   // (filename,word\tcount)
    fname_word_counts
  }
}

object SimpleApp  {

        def main(args: Array[String]) {

            //Logger.getLogger("org").setLevel(Level.OFF)
            //Logger.getLogger("akka").setLevel(Level.OFF)

            // input ans output paths
            val INPUT_PATH = "hdfs://master:8020/vijay/mywordcount/"
            val OUTPUT_PATH = "hdfs://master:8020/vijay/mywordcount/output/"

            // context
            val context = Spark.sc
            val data = context.wholeTextFiles(INPUT_PATH)

            // final output RDD
            var output : org.apache.spark.rdd.RDD[(String, String)] = context.emptyRDD

            // files to process
            val files = data.map { case (filename, content) => filename}

            // Apply wordcount Processing on each File received in wholeTextFiles.
            files.collect.foreach( filename => {
                            output = output.union(Process(filename));
            })

           //output.saveAsTextFile(OUTPUT_PATH);   // this will save output as (filename,word\tcount)
           output.saveAsHadoopFile(OUTPUT_PATH, classOf[String], classOf[String],classOf[RDDMultipleTextOutputFormat])  // custom output Format.

           //close context
           context.stop();

         }
}

环境:
scala编译器2.10.2版
spark-1.2.0-bin-hadoop2.3版本
hadoop 2.3.0-cdh5.0.3版本
样本输出:

[ramisetty@node-1 stack]$ hadoop fs -ls /vijay/mywordcount/output
Found 5 items
-rw-r--r--   3 ramisetty supergroup          0 2015-06-09 03:49 /vijay/mywordcount/output/_SUCCESS
-rw-r--r--   3 ramisetty supergroup         40 2015-06-09 03:49 /vijay/mywordcount/output/file1.txt-part-00000
-rw-r--r--   3 ramisetty supergroup          8 2015-06-09 03:49 /vijay/mywordcount/output/file1.txt-part-00001
-rw-r--r--   3 ramisetty supergroup         44 2015-06-09 03:49 /vijay/mywordcount/output/file2.txt-part-00002
-rw-r--r--   3 ramisetty supergroup          8 2015-06-09 03:49 /vijay/mywordcount/output/file2.txt-part-00003

相关问题