我有一个spark的工作,它读取一些s3上的csv文件,处理结果并保存为Parquet文件。这些csv包含日语文本。
当我在本地运行这个作业,读取s3csv文件并将Parquet文件写入本地文件夹时,日语字母看起来很好。
但是,当我在我的spark集群上运行这个程序,读取相同的s3csv文件并将parquet写入hdfs时,所有的日语字母都是乱码。
运行在spark cluster上(数据混乱)
spark-submit --master spark://spark-master-stg:7077 \
--conf spark.sql.session.timeZone=UTC \
--conf spark.driver.extraJavaOptions="-Ddatabase=dev_mall -Dtable=table_base_TEST -DtimestampColumn=time_stamp -DpartitionColumns= -Dyear=-1 -Dmonth=-1 -DcolRenameMap= -DpartitionByYearMonth=true -DaddSpdbCols=false -DconvertTimeDateCols=true -Ds3AccessKey=xxxxx -Ds3SecretKey=yyyy -Ds3BasePath=s3a://bucket/export/e2e-test -Ds3Endpoint=http://s3.url -DhdfsBasePath=hdfs://nameservice1/tmp/encoding-test -DaddSpdbCols=false" \
--name Teradata_export_test_ash \
--class com.mycompany.data.spark.job.TeradataNormalTableJob \
--deploy-mode client \
https://artifactory.maven-it.com/spdb-mvn-release/com.mycompany.data/teradata-spark_2.11/0.1/teradata-spark_2.11-0.1-assembly.jar
本地运行(数据看起来很好)
spark-submit --master local \
--conf spark.sql.session.timeZone=UTC \
--conf spark.driver.extraJavaOptions="-Ddatabase=dev_mall -Dtable=table_base_TEST -DtimestampColumn=time_stamp -DpartitionColumns= -Dyear=-1 -Dmonth=-1 -DcolRenameMap= -DpartitionByYearMonth=true -DaddSpdbCols=false -DconvertTimeDateCols=true -Ds3AccessKey=xxxxx -Ds3SecretKey=yyyy -Ds3BasePath=s3a://bucket/export/e2e-test -Ds3Endpoint=http://s3.url -DhdfsBasePath=/tmp/encoding-test -DaddSpdbCols=false" \
--name Teradata_export_test_ash \
--class com.mycompany.data.spark.job.TeradataNormalTableJob \
--deploy-mode client \
https://artifactory.maven-it.com/spdb-mvn-release/com.mycompany.data/teradata-spark_2.11/0.1/teradata-spark_2.11-0.1-assembly.jar
如上所述,两个spark submit作业都指向同一个s3文件,唯一不同的是在spark cluster上运行时,结果被写入hdfs。
正在读取csv:
def readTeradataCSV(schema: StructType, path: String) : DataFrame = {
dataFrameReader.option("delimiter", "\u0001")
.option("header", "false")
.option("inferSchema", "false")
.option("multiLine","true")
.option("encoding", "UTF-8")
.option("charset", "UTF-8")
.schema(schema)
.csv(path)
}
我是这样写Parquet地板的:
finalDf.write
.format("parquet")
.mode(SaveMode.Append)
.option("path", hdfsTablePath)
.option("encoding", "UTF-8")
.option("charset", "UTF-8")
.partitionBy(parCols: _*)
.save()
hdfs上的数据如下所示:
关于如何解决这个问题有什么建议吗?
输入csv文件是否必须采用utf-8编码?
更新发现它与Parquet地板无关,而是csv加载。在这里问了一个单独的问题:
sparkcsv阅读器:乱码日语文本和多行处理
1条答案
按热度按时间ohtdti5x1#
Parquet格式没有用于
encoding
或者charset
查阅https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/parquetoptions.scala因此,您的代码无效:
这些选项仅适用于csv,您应该在读取源文件时设置它们(或者设置其中一个,因为它们是同义词)。
假设您正在使用sparkDataframeapi读取csv;否则你只能靠自己了。