scala中用动态列将rdd数据写入csv

goqiplq2  于 2021-06-01  发布在  Hadoop
关注(0)|答案(2)|浏览(447)

我正在从hdfs目录中读取多个文件,对于每个文件,生成的数据都使用以下格式打印:

frequencies.foreach(x => println(x._1 + ": "+x._2))

打印的数据是(对于file1.txt):

'text': 45
'data': 100
'push': 150

其他文件的键可以不同,如(file2.txt):

'data': 45
'lea': 100
'jmp': 150

密钥在所有文件中不一定相同。我希望所有文件数据都以以下格式写入.csv文件:

Filename   text  data  push  lea  jmp
File1.txt  45    100   150   0    0
File2.txt  0     45    0     100  150  ....

有人能帮我找到解决这个问题的办法吗?

nwlls2ji

nwlls2ji1#

如果你的文件不够大,你可以做没有Spark。这里是我的示例代码,csv格式是旧样式,不喜欢您预期的输出,但您可以很容易地调整它。

import scala.io.Source
  import org.apache.hadoop.fs._
  val sparkSession =   ...  // I created it to retrieve hadoop configuration, you can create your own Configuration.
  val inputPath =   ...
  val outputPath =   ...

  val fs = FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)
  // read all files content to Array of Map[String,String]
  val filesContent = fs.listStatus(new Path(inputPath)).filter(_.isFile).map(_.getPath).filter(_.getName.endsWith(".txt"))
    .map(s => (s.getName, Source.fromInputStream(fs.open(s)).getLines()
                    .map(_.split(":").map(_.trim))
                    .filter(_.length == 2)
                    .map(p => (p.head, p.last)).toMap))
  // create default Map with all possible keys
  val listKeys = filesContent.flatMap(_._2.keys).distinct.map(s => (s, "0")).toMap
  val csvContent = filesContent.map(s => (s._1, listKeys ++ s._2))
    .map(s => (s._1, s._2.values.mkString(",")))
    .map(s => s"${s._1},${s._2}")
    .mkString("\n")
  val csvHeader = ("Filename" +: listKeys.keys.toList).mkString(",")
  val csv = csvHeader + "\n" + csvContent

  new PrintWriter(fs.create(new Path(outputPath))){
    write(csv)
    close()
  }
f45qwnt8

f45qwnt82#

我建议为目录中的所有文件创建一个Dataframe,然后使用 pivot 要相应地重新调整数据形状:

val df1 = sc.parallelize(Array(
("text",45  ),
("data",100 ),
("push",150 ))).toDF("key", "value").withColumn("Filename", lit("File1") )

val df2 = sc.parallelize(Array(
("data",45  ),
("lea",100 ),
("jump",150 ))).toDF("key", "value").withColumn("Filename", lit("File2") )

val df = df1.unionAll(df2)

df.show
+----+-----+--------+
| key|value|Filename|
+----+-----+--------+
|text|   45|   File1|
|data|  100|   File1|
|push|  150|   File1|
|data|   45|   File2|
| lea|  100|   File2|
|jump|  150|   File2|
+----+-----+--------+

val finalDf = df.groupBy($"Filename").pivot("key").agg(first($"value") ).na.fill(0)

finalDf.show
+--------+----+----+---+----+----+
|Filename|data|jump|lea|push|text|
+--------+----+----+---+----+----+
|   File1| 100|   0|  0| 150|  45|
|   File2|  45| 150|100|   0|   0|
+--------+----+----+---+----+----+

您可以使用 DataFrameWriter ```
df.write.csv(..)

其中最困难的部分是为每个文件创建一个不同的Dataframe,并为 `Filename` 从中创建Dataframe

相关问题