使用spark 2.11,我得到了以下数据集(从cassandra表中读取):
+------------+----------------------------------------------------------+
|id |attributes |
+------------+----------------------------------------------------------+
|YH8B135U123|[{"id":1,"name":"function","score":10.0,"snippets":1}] |
+------------+----------------------------------------------------------+
这是printschema():
root
|-- id: string (nullable = true)
|-- attributes: string (nullable = true)
这个 attributes
列是json对象的数组。我试着把它分解成数据集,但一直失败。我试图将模式定义为:
StructType type = new StructType()
.add("id", new IntegerType(), false)
.add("name", new StringType(), false)
.add("score", new FloatType(), false)
.add("snippets", new IntegerType(), false );
ArrayType schema = new ArrayType(type, false);
并提供给 from_json
具体如下:
df = df.withColumn("val", functions.from_json(df.col("attributes"), schema));
此操作失败,出现匹配错误:
Exception in thread "main" scala.MatchError: org.apache.spark.sql.types.IntegerType@43756cb (of class org.apache.spark.sql.types.IntegerType)
正确的方法是什么?
2条答案
按热度按时间4dc9hkyq1#
可以通过以下方式指定架构:
或者对于java:
d8tt03nd2#
您可以将架构定义为文本字符串:
如果您喜欢使用模式:
您的原始代码有两个问题:(1)您使用了保留关键字
type
作为变量名,并且(2)不需要使用new
在add
.