scala 如果从Dataset[String]加载,如何使Spark不会丢失CSV文件的最后一行?

c9qzyr3d  于 2023-03-23  发布在  Scala
关注(0)|答案(1)|浏览(174)

我们有以下CSV文件:

RWA
Country of exposure
Credit risk asset class
Projection period
Scenario
RWA

第一行的RWA是header。最后一行等于header,但它不是header。当CSV文件内容从Dataset[String]加载时,如

import spark.implicits._
val source: Array[String] = (
      "RWA\n" +
      "Country of exposure\n" +
      "Credit risk asset class\n" +
      "Projection period\n" +
      "Scenario\n" +
      "RWA"
      ).split("\n")
val csvData: Dataset[String] = spark.sparkContext.parallelize(source).toDS()
val df = spark.read
  .format("com.databricks.spark.csv")
  .option("header", value = true).csv(csvData)
// df.count() == 4 unexpectedly

df.count()应为5,但实际为4。
如果从文件加载相同的CSV,则不会发生此问题:

val tempFile = Files.createTempFile("tmp", ".csv")
val res = "RWA\n" +
      "Country of exposure\n" +
      "Credit risk asset class\n" +
      "Projection period\n" +
      "Scenario\n" +
      "RWA"
Files.writeString(tempFile, res)
val df = spark.read
  .format("com.databricks.spark.csv")
  .option("header", value = true)
  .csv(tempFile.toString)
// df.count() == 5 as expected

从Dataset加载时,是否有方法告诉Spark行为相同?

Scala版本:2.12.14
Spark版本:3.0.3

laximzn5

laximzn51#

这是一个非常有趣的行为,我无法解释
现在,我不知道你的真实的用例,但是如果你有一个Array[String]要转换为RDD,使用第一行作为头,我认为你不应该这样做(转换为RDD[String]并使用spark.read.csv(...))
Something like:

import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.encoders.RowEncoder

val header = StructType(source(0).split(",").map(f => StructField(f, StringType, nullable = false)))
val encoder = RowEncoder.apply(header)
val data = source.tail.map(line => Row(line.split(","):_*)).toSeq

spark.createDataset(data)(encoder).show

相关问题