在使用scala从spark中的s3加载csv时,如何指定模式?

ozxc1zmp  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(423)

我在google上搜索了stack上的多个语法迭代,但没有一个对我有用。我的代码如下:

import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, DoubleType};

val schema1 = (new StructType)
    .add("PASSENGERID", IntegerType, true)
    .add("PCLASS", IntegerType, true)
    .add("NAME", IntegerType, true)
    .add("SEX", StringType, true)
    .add("AGE", DoubleType, true)
    .add("SIBSP", IntegerType, true)
    .add("PARCH", IntegerType, true)
    .add("TICKET", StringType, true)
    .add("FARE", DoubleType, true)
    .add("CABIN", StringType, true)
    .add("EMBARKED", StringType, true)

 val schema2 = StructType(
    StructField("PASSENGERID", IntegerType, true) ::
    StructField("PCLASS", IntegerType, true) ::
    StructField("NAME", IntegerType, true) ::
    StructField("SEX", StringType, true) ::
    StructField("AGE", DoubleType, true) ::
    StructField("SIBSP", IntegerType, true) ::
    StructField("PARCH", IntegerType, true) ::
    StructField("TICKET", StringType, true) ::
    StructField("FARE", DoubleType, true) ::
    StructField("CABIN", StringType, true) ::
    StructField("EMBARKED", StringType, true) :: Nil)

val schema3 = StructType(Array(
    StructField("PASSENGERID", IntegerType, true),
    StructField("PCLASS", IntegerType, true),
    StructField("NAME", IntegerType, true),
    StructField("SEX", StringType, true),
    StructField("AGE", DoubleType, true),
    StructField("SIBSP", IntegerType, true),
    StructField("PARCH", IntegerType, true),
    StructField("TICKET", StringType, true),
    StructField("FARE", DoubleType, true),
    StructField("CABIN", StringType, true),
    StructField("EMBARKED", StringType, true)))

val schema4 = StructType(Seq(
    StructField("PASSENGERID", IntegerType, true),
    StructField("PCLASS", IntegerType, true),
    StructField("NAME", IntegerType, true),
    StructField("SEX", StringType, true),
    StructField("AGE", DoubleType, true),
    StructField("SIBSP", IntegerType, true),
    StructField("PARCH", IntegerType, true),
    StructField("TICKET", StringType, true),
    StructField("FARE", DoubleType, true),
    StructField("CABIN", StringType, true),
    StructField("EMBARKED", StringType, true)
))

val schema5 = StructType(
  List(
    StructField("PASSENGERID", IntegerType, true),
    StructField("PCLASS", IntegerType, true),
    StructField("NAME", IntegerType, true),
    StructField("SEX", StringType, true),
    StructField("AGE", DoubleType, true),
    StructField("SIBSP", IntegerType, true),
    StructField("PARCH", IntegerType, true),
    StructField("TICKET", StringType, true),
    StructField("FARE", DoubleType, true),
    StructField("CABIN", StringType, true),
    StructField("EMBARKED", StringType, true)
  )
)

/*
val df = spark.read
    .option("header", true)
    .csv("s3a://mybucket/ybspark/input/PASSENGERS.csv")
    .schema(schema)

* /

//this works
val df = spark.read.option("header", true).csv("s3a://mybucket/ybspark/input/PASSENGERS.csv")

df.show(false)
df.printSchema()

//fun errors
val df1 = spark.read.option("header", true).csv("s3a://mybucket/ybspark/input/PASSENGERS.csv").schema(schema1)
val df2 = spark.read.option("header", true).csv("s3a://mybucket/ybspark/input/PASSENGERS.csv").schema(schema2)
val df3 = spark.read.option("header", true).csv("s3a://mybucket/ybspark/input/PASSENGERS.csv").schema(schema3)
val df4 = spark.read.option("header", true).csv("s3a://mybucket/ybspark/input/PASSENGERS.csv").schema(schema4)
val df5 = spark.read.option("header", true).csv("s3a://mybucket/ybspark/input/PASSENGERS.csv").schema(schema5)

数据是卡格尔泰坦尼克号的生存集,标题中的字段大写。我试过将此作为脚本提交到sparkshell,并在sparkshell中手动运行命令。sparkshell-i在dfx读取时抛出一些语法错误,如果我手动加载任何一个模式,它们看起来都很好,而且读取都有相同的错误。

scala> val df4 = spark.read.option("header", true).csv("s3a://mybucket/ybspark/input/PASSENGERS.csv").schema(schema4)
<console>:26: error: overloaded method value apply with alternatives:
  (fieldIndex: Int)org.apache.spark.sql.types.StructField <and>
  (names: Set[String])org.apache.spark.sql.types.StructType <and>
  (name: String)org.apache.spark.sql.types.StructField
 cannot be applied to (org.apache.spark.sql.types.StructType)
       val df4 = spark.read.option("header", true).csv("s3a://mybucket/ybspark/input/PASSENGERS.csv").schema(schema4)

我不明白我做错了什么。我在aws emr的spark版本2.4.4上。

btxsgosb

btxsgosb1#

inferSchema 参数 false 这样spark就不会在加载数据时推断模式。
移动你的 .schema 之前 .csv 因为dataframe对象没有 schema 功能。
请检查下面的代码。

scala> val df1 = spark.read.option("header", true).option("inferSchema", false).schema(schema1).csv("s3a://mybucket/ybspark/input/PASSENGERS\.csv")
df1: org.apache.spark.sql.DataFrame = [PASSENGERID: int, PCLASS: int ... 9 more fields]

scala> val df2 = spark.read.option("header", true).option("inferSchema", false).schema(schema2).csv("s3a://mybucket/ybspark/input/PASSENGERS\.csv")
df2: org.apache.spark.sql.DataFrame = [PASSENGERID: int, PCLASS: int ... 9 more fields]

scala> val df3 = spark.read.option("header", true).option("inferSchema", false).schema(schema3).csv("s3a://mybucket/ybspark/input/PASSENGERS\.csv")
df3: org.apache.spark.sql.DataFrame = [PASSENGERID: int, PCLASS: int ... 9 more fields]

scala> val df4 = spark.read.option("header", true).option("inferSchema", false).schema(schema4).csv("s3a://mybucket/ybspark/input/PASSENGERS\.csv")
df4: org.apache.spark.sql.DataFrame = [PASSENGERID: int, PCLASS: int ... 9 more fields]

scala> val df5 = spark.read.option("header", true).option("inferSchema", false).schema(schema5).csv("s3a://mybucket/ybspark/input/PASSENGERS\.csv")
df5: org.apache.spark.sql.DataFrame = [PASSENGERID: int, PCLASS: int ... 9 more fields]

相关问题