目前我正在使用spark从bigqiery表中读取数据,并将其作为csv写入存储桶。我面临的一个问题是spark没有从bq正确读取空字符串值。它读取空字符串值,但在csv中,它将该值写为带双引号的空字符串(即,像这样的“”)。
# Load data from BigQuery.
bqdf = spark.read.format('bigquery') \
.option('table', <bq_dataset> + <bq_table>) \
.load()
bqdf.createOrReplaceTempView('bqdf')
# Select required data into another df
bqdf2 = spark.sql(
'SELECT * FROM bqdf')
# write to GCS
bqdf2.write.csv(<gcs_data_path> + <bq_table> + '/' , mode='overwrite', sep= '|')
在写入csv时,我尝试了df.write.csv()的emptyvalue=''和nullvalue选项,但都不起作用。
我需要一个解决这个问题的办法,如果有人面临这个问题,可以帮助。谢谢!
1条答案
按热度按时间zynd9foi1#
我能够重现您的案例,并找到了一个解决方案,该解决方案与我在bigquery中创建的示例表一起工作。数据如下:
根据pyspark文档,在pyspark.sql.dataframewriter(df)类中,有一个名为nullvalue的选项:
nullvalue–设置空值的字符串表示形式。如果未设置,则使用默认值空字符串。
这就是你要找的。然后,我实现了下面的nullvalue选项。
注意我用了
data_view2.show()
打印视图以检查是否正确读取。输出为:因此,对空值进行了精确的解释。此外,我还检查了.csv文件:
正如您所看到的,空值是正确的,不表示为双引号之间的空字符串。最后,作为最后的检查,我创建了一个从这个.csv文件到bigquery的加载作业。创建了表,并准确地解释了空值。
注意:我在先前创建的dataproc集群中从dataproc作业的控制台运行了pyspark作业。此外,集群与bigquery中的数据集位于同一位置。