如何将数据从spark sql导出到csv

5lhxktic  于 2021-06-02  发布在  Hadoop
关注(0)|答案(7)|浏览(1312)

此命令适用于hiveql:

insert overwrite directory '/data/home.csv' select * from testtable;

但是对于sparksql,我在 org.apache.spark.sql.hive.HiveQl 堆栈跟踪:

java.lang.RuntimeException: Unsupported language features in query:
    insert overwrite directory '/data/home.csv' select * from testtable

请指导我在sparksql中编写导出到csv功能。

mbjcgjjk

mbjcgjjk1#

您可以使用下面的语句以csv格式写入dataframe的内容 df.write.csv("/data/home/csv") 如果需要将整个Dataframe写入单个csv文件,则使用 df.coalesce(1).write.csv("/data/home/sample.csv") 对于spark 1.x,可以使用spark csv将结果写入csv文件
下面是一段代码片段

import org.apache.spark.sql.hive.HiveContext
// sc - existing spark context
val sqlContext = new HiveContext(sc)
val df = sqlContext.sql("SELECT * FROM testtable")
df.write.format("com.databricks.spark.csv").save("/data/home/csv")

将内容写入单个文件

import org.apache.spark.sql.hive.HiveContext
// sc - existing spark context
val sqlContext = new HiveContext(sc)
val df = sqlContext.sql("SELECT * FROM testtable")
df.coalesce(1).write.format("com.databricks.spark.csv").save("/data/home/sample.csv")
yrdbyhpb

yrdbyhpb2#

在spark csv的帮助下,我们可以写入csv文件。

val dfsql = sqlContext.sql("select * from tablename")
dfsql.write.format("com.databricks.spark.csv").option("header","true").save("output.csv")`
w3nuxt5m

w3nuxt5m3#

错误消息表明查询语言中不支持此功能。但是您可以像往常一样通过rdd接口以任何格式保存Dataframe( df.rdd.saveAsTextFile ). 或者你可以退房https://github.com/databricks/spark-csv.

qkf9rpyu

qkf9rpyu4#

上面使用spark csv的回答是正确的,但是有一个问题-库基于Dataframe分区创建多个文件。这不是我们通常需要的。因此,您可以将所有分区合并为一个分区:

df.coalesce(1).
    write.
    format("com.databricks.spark.csv").
    option("header", "true").
    save("myfile.csv")

并将lib的输出(名称“part-00000”)重命名为所需的文件名。
此博客文章提供了更多详细信息:https://fullstackml.com/2015/12/21/how-to-export-data-frame-from-apache-spark/

u3r8eeie

u3r8eeie5#

最简单的方法是MapDataframe的rdd并使用mkstring:

df.rdd.map(x=>x.mkString(","))

从spark 1.5开始(甚至更早) df.map(r=>r.mkString(",")) 如果您想要csv转义,也可以使用apachecommons-lang。e、 这是我们使用的代码

def DfToTextFile(path: String,
                   df: DataFrame,
                   delimiter: String = ",",
                   csvEscape: Boolean = true,
                   partitions: Int = 1,
                   compress: Boolean = true,
                   header: Option[String] = None,
                   maxColumnLength: Option[Int] = None) = {

    def trimColumnLength(c: String) = {
      val col = maxColumnLength match {
        case None => c
        case Some(len: Int) => c.take(len)
      }
      if (csvEscape) StringEscapeUtils.escapeCsv(col) else col
    }
    def rowToString(r: Row) = {
      val st = r.mkString("~-~").replaceAll("[\\p{C}|\\uFFFD]", "") //remove control characters
      st.split("~-~").map(trimColumnLength).mkString(delimiter)
    }

    def addHeader(r: RDD[String]) = {
      val rdd = for (h <- header;
                     if partitions == 1; //headers only supported for single partitions
                     tmpRdd = sc.parallelize(Array(h))) yield tmpRdd.union(r).coalesce(1)
      rdd.getOrElse(r)
    }

    val rdd = df.map(rowToString).repartition(partitions)
    val headerRdd = addHeader(rdd)

    if (compress)
      headerRdd.saveAsTextFile(path, classOf[GzipCodec])
    else
      headerRdd.saveAsTextFile(path)
  }
8aqjt8rx

8aqjt8rx6#

自Spark
2.X spark-csv 作为本机数据源集成。因此,必要的语句简化为(windows)

df.write
  .option("header", "true")
  .csv("file:///C:/out.csv")

或unix

df.write
  .option("header", "true")
  .csv("/var/out.csv")

注意:正如注解所说,它是用该名称创建目录,其中包含分区,而不是标准的csv文件。然而,这很可能是您想要的,因为否则您可能会导致驱动程序崩溃(内存不足),或者您可能正在使用非分布式环境。

yacmzcpb

yacmzcpb7#

在数据框中输入代码:

val p=spark.read.format("csv").options(Map("header"->"true","delimiter"->"^")).load("filename.csv")

相关问题