scala Spark-使用Struct架构编写一个DF作为CSV

qybjjes1  于 2022-11-09  发布在  Scala
关注(0)|答案(2)|浏览(168)

我有一个通过读取EBCDIC数据文件创建的Spark DataFrame。其模式如下

scala> myDF.printSchema
root
 |-- DF_RECORD: struct (nullable = true)
 |    |-- ID: string (nullable = true)
 |    |-- BALANCE_AMT: decimal(15,4) (nullable = true)

我甚至可以让它显示它的行。但是,当我尝试将其写为CSV时,我得到以下错误

scala> myDF.write.format("csv").mode("overwrite").option("header", "true").save("/path")
org.apache.spark.sql.AnalysisException: CSV data source does not support struct<ID:string,BALANCE_AMT:decimal(15,4)> data type.;

在写之前,有没有我可能需要取消的列转换?
Spark版本2.4.0-cdh6.2.1
Scala版本2.11.12

vyswwuz2

vyswwuz21#

在写入之前添加selectExpr

scala> myDF.selectExpr("DF_RECORD.*").write.format("csv").mode("overwrite").option("header", "true").save("/path")
dw1jzc5e

dw1jzc5e2#

通过访问withColumn中的列名将其转换为单独的列:

from pyspark.sql.types import StructType, StructField, StringType, Row, DoubleType
import pyspark.sql.functions as F

schema = StructType([
    StructField("DF_RECORD", 
      StructType([
          StructField("ID", StringType()),
          StructField("BALANCE_AMT", DoubleType())
      ])
      )
    ])

df = spark.createDataFrame([Row(Row("1",1000.0))], schema=schema)
df.printSchema()

[Out]:
root
 |-- DF_RECORD: struct (nullable = true)
 |    |-- ID: string (nullable = true)
 |    |-- BALANCE_AMT: double (nullable = true)

df_csv = df \
          .withColumn("ID", F.col("DF_RECORD")["ID"]) \
          .withColumn("BALANCE_AMT", F.col("DF_RECORD")["BALANCE_AMT"]) \
          .drop("DF_RECORD")

[Out]:
+---+-----------+
| ID|BALANCE_AMT|
+---+-----------+
|  1|     1000.0|
+---+-----------+

现在,将其导出为CSV:

df_csv.write.format("csv").mode("overwrite").option("header", "true").save("/content/sample_data/test1.csv")

!cat "/content/sample_data/test1.csv/part-00000-b342e07a-6d41-40b5-afa2-39eeef3b70a2-c000.csv"

[Out]:
ID, BALANCE_AMT
1, 1000.0

相关问题