我在google上搜索了stack上的多个语法迭代,但没有一个对我有用。我的代码如下:
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, DoubleType};
val schema1 = (new StructType)
.add("PASSENGERID", IntegerType, true)
.add("PCLASS", IntegerType, true)
.add("NAME", IntegerType, true)
.add("SEX", StringType, true)
.add("AGE", DoubleType, true)
.add("SIBSP", IntegerType, true)
.add("PARCH", IntegerType, true)
.add("TICKET", StringType, true)
.add("FARE", DoubleType, true)
.add("CABIN", StringType, true)
.add("EMBARKED", StringType, true)
val schema2 = StructType(
StructField("PASSENGERID", IntegerType, true) ::
StructField("PCLASS", IntegerType, true) ::
StructField("NAME", IntegerType, true) ::
StructField("SEX", StringType, true) ::
StructField("AGE", DoubleType, true) ::
StructField("SIBSP", IntegerType, true) ::
StructField("PARCH", IntegerType, true) ::
StructField("TICKET", StringType, true) ::
StructField("FARE", DoubleType, true) ::
StructField("CABIN", StringType, true) ::
StructField("EMBARKED", StringType, true) :: Nil)
val schema3 = StructType(Array(
StructField("PASSENGERID", IntegerType, true),
StructField("PCLASS", IntegerType, true),
StructField("NAME", IntegerType, true),
StructField("SEX", StringType, true),
StructField("AGE", DoubleType, true),
StructField("SIBSP", IntegerType, true),
StructField("PARCH", IntegerType, true),
StructField("TICKET", StringType, true),
StructField("FARE", DoubleType, true),
StructField("CABIN", StringType, true),
StructField("EMBARKED", StringType, true)))
val schema4 = StructType(Seq(
StructField("PASSENGERID", IntegerType, true),
StructField("PCLASS", IntegerType, true),
StructField("NAME", IntegerType, true),
StructField("SEX", StringType, true),
StructField("AGE", DoubleType, true),
StructField("SIBSP", IntegerType, true),
StructField("PARCH", IntegerType, true),
StructField("TICKET", StringType, true),
StructField("FARE", DoubleType, true),
StructField("CABIN", StringType, true),
StructField("EMBARKED", StringType, true)
))
val schema5 = StructType(
List(
StructField("PASSENGERID", IntegerType, true),
StructField("PCLASS", IntegerType, true),
StructField("NAME", IntegerType, true),
StructField("SEX", StringType, true),
StructField("AGE", DoubleType, true),
StructField("SIBSP", IntegerType, true),
StructField("PARCH", IntegerType, true),
StructField("TICKET", StringType, true),
StructField("FARE", DoubleType, true),
StructField("CABIN", StringType, true),
StructField("EMBARKED", StringType, true)
)
)
/*
val df = spark.read
.option("header", true)
.csv("s3a://mybucket/ybspark/input/PASSENGERS.csv")
.schema(schema)
* /
//this works
val df = spark.read.option("header", true).csv("s3a://mybucket/ybspark/input/PASSENGERS.csv")
df.show(false)
df.printSchema()
//fun errors
val df1 = spark.read.option("header", true).csv("s3a://mybucket/ybspark/input/PASSENGERS.csv").schema(schema1)
val df2 = spark.read.option("header", true).csv("s3a://mybucket/ybspark/input/PASSENGERS.csv").schema(schema2)
val df3 = spark.read.option("header", true).csv("s3a://mybucket/ybspark/input/PASSENGERS.csv").schema(schema3)
val df4 = spark.read.option("header", true).csv("s3a://mybucket/ybspark/input/PASSENGERS.csv").schema(schema4)
val df5 = spark.read.option("header", true).csv("s3a://mybucket/ybspark/input/PASSENGERS.csv").schema(schema5)
数据是卡格尔泰坦尼克号的生存集,标题中的字段大写。我试过将此作为脚本提交到sparkshell,并在sparkshell中手动运行命令。sparkshell-i在dfx读取时抛出一些语法错误,如果我手动加载任何一个模式,它们看起来都很好,而且读取都有相同的错误。
scala> val df4 = spark.read.option("header", true).csv("s3a://mybucket/ybspark/input/PASSENGERS.csv").schema(schema4)
<console>:26: error: overloaded method value apply with alternatives:
(fieldIndex: Int)org.apache.spark.sql.types.StructField <and>
(names: Set[String])org.apache.spark.sql.types.StructType <and>
(name: String)org.apache.spark.sql.types.StructField
cannot be applied to (org.apache.spark.sql.types.StructType)
val df4 = spark.read.option("header", true).csv("s3a://mybucket/ybspark/input/PASSENGERS.csv").schema(schema4)
我不明白我做错了什么。我在aws emr的spark版本2.4.4上。
1条答案
按热度按时间btxsgosb1#
套
inferSchema
参数false
这样spark就不会在加载数据时推断模式。移动你的
.schema
之前.csv
因为dataframe对象没有schema
功能。请检查下面的代码。