sparkcsv阅读器:乱码日语文本和多行处理

fv2wmkja  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(383)

在我的spark作业(spark 2.4.1)中,我正在读取s3上的csv文件。这些文件包含日语字符。而且它们可以有^m字符(u000d),因此我需要将它们解析为多行。
首先,我使用以下代码读取csv文件:

implicit class DataFrameReadImplicits (dataFrameReader: DataFrameReader) {
     def readTeradataCSV(schema: StructType, s3Path: String) : DataFrame = {

        dataFrameReader.option("delimiter", "\u0001")
          .option("header", "false")
          .option("inferSchema", "false")
          .option("multiLine","true")
          .option("encoding", "UTF-8")
          .option("charset", "UTF-8")
          .schema(schema)
          .csv(s3Path)
     }
  }

但是当我用这种方法读df时,所有的日语字符都是乱码。
在做了一些测试之后,我发现如果我使用“spark.sparkcontext.textfile(path)”正确编码的日语字符读取同一个s3文件。
所以我试着这样:

implicit class SparkSessionImplicits (spark : SparkSession) {
    def readTeradataCSV(schema: StructType, s3Path: String) = {
      import spark.sqlContext.implicits._
      spark.read.option("delimiter", "\u0001")
        .option("header", "false")
        .option("inferSchema", "false")
        .option("multiLine","true")
        .schema(schema)
        .csv(spark.sparkContext.textFile(s3Path).map(str => str.replaceAll("\u000D"," ")).toDS())
    }
  }

现在编码问题已经解决。但是多行无法正常工作,并且在^m字符附近断行,即使我尝试使用str.replaceall(“\u000d”,”“)替换^m
关于如何使用第一种方法阅读日语字符,或使用第二种方法处理多行的提示?
更新:当应用程序在spark集群上运行时,就会出现这种编码问题。当我在本地运行应用程序,读取相同的s3文件时,编码工作正常。

mspsb9vt

mspsb9vt1#

有些东西在代码中,但文档中还没有。您是否尝试显式设置行分隔符,从而避免“多行”解决方法,因为 ^M ?
来自spark“textsuite”分支2.4的单元测试
https://github.com/apache/spark/blob/branch-2.4/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/textsuite.scala

def testLineSeparator(lineSep: String): Unit = {
  test(s"SPARK-23577: Support line separator - lineSep: '$lineSep'") {
  ...
}
// scalastyle:off nonascii
Seq("|", "^", "::", "!!!@3", 0x1E.toChar.toString, "아").foreach { lineSep =>
  testLineSeparator(lineSep)
}
// scalastyle:on nonascii

从csv选项解析的源代码,分支3.0
https://github.com/apache/spark/blob/branch-3.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/csvoptions.scala

val lineSeparator: Option[String] = parameters.get("lineSep").map { sep =>
  require(sep.nonEmpty, "'lineSep' cannot be an empty string.")
  require(sep.length == 1, "'lineSep' can contain only 1 character.")
  sep
}
val lineSeparatorInRead: Option[Array[Byte]] = lineSeparator.map { lineSep =>
  lineSep.getBytes(charset)
}

所以,看起来csv不支持行分隔符的字符串,只支持单个字符,因为它依赖于一些hadoop库。我希望对你来说没问题。
匹配的圣战者是。。。
spark-21289基于文本的格式不支持自定义行尾分隔符。。。
spark-23577特定于文本数据源>在v2.4.0中修复

t2a7ltrp

t2a7ltrp2#

如果数据用双引号括起来,则可以使用escape属性。

df = (spark.read
 .option("header", "false")
 .csv("******",multiLine=True, escape='"')
)

相关问题