有没有一种方法可以在spark 3.0.1中使用名称不同于part*的scalaxx导出csv或其他文件?

liwlm1x9  于 2021-05-16  发布在  Spark
关注(0)|答案(1)|浏览(530)

我用scala在spark中创建了一个二维立方体。数据来自两个不同的Dataframe。名称是“借阅表”和“借阅表”。它们是用“createorreplacetempview”选项创建的,因此可以对它们运行sql查询。目标是创建两个维度(性别和部门)的多维数据集,总结图书馆借书的总数。用命令

val cube=spark.sql("""
    select 
    borrowersTable.department,borrowersTable.gender,count(loansTable.bibno)
    from borrowersTable,loansTable
    where borrowersTable.bid=loansTable.bid
    group by borrowersTable.gender,borrowersTable.department with cube;
""")

我创建了一个立方体,结果如下:

然后使用命令
cube.write.format(“csv”).save(“文件://……/data/cube”)
spark创建了一个名为cube的文件夹,其中包含34个名为part*.csv的文件,其中包括department、gender和sum of loans(每组按)列。
这里的目标是用前两列(属性)的名称创建文件:对于groupby(attr1,attr2),文件应命名为attr1\u attr2。
e、 对于(经济学,m)文件应命名为economics\u m。对于(数学,空)它应该是数学,空等等。任何帮助都将不胜感激。

mspsb9vt

mspsb9vt1#

当您调用'df.write.format(“…”).save(“…”)时,每个spark执行器都将其持有的分区保存到相应的part*文件中。这是存储和加载大文件的机制,您不能更改它。不过,您可以尝试以下替代方案,只要在您的情况下效果更好:
分区依据:

cube
  .write
  .partitionBy("department", "gender")
  .format("csv")
  .save("file:///....../data/cube")

这将创建名为“department=Physics/gender=M”的子文件夹,其中仍包含part*文件。此结构稍后可以加载回spark,并用于分区列的有效联接。
收集

val csvRows = cube
  .collect()
  .foreach {
    case Row(department: String, gender: String, _) => 
      // just the simple way to write CSV, you can use any CSV lib here as well
      Files.write(Paths.get(s"$department_$gender.csv"), s"$department,$gender".getBytes(StandardCharsets.UTF_8))
  }

如果调用'collect()',则在驱动程序端以'Array[Row]'的形式接收Dataframe,然后可以对其执行任何操作。这种方法的一个重要限制是,Dataframe应该适合于驱动程序的内存。

相关问题