spark-read列中的json数组

e1xvtsh3  于 2021-07-13  发布在  Spark
关注(0)|答案(2)|浏览(378)

使用spark 2.11,我得到了以下数据集(从cassandra表中读取):

+------------+----------------------------------------------------------+
|id         |attributes                                                 |
+------------+----------------------------------------------------------+
|YH8B135U123|[{"id":1,"name":"function","score":10.0,"snippets":1}]     |
+------------+----------------------------------------------------------+

这是printschema():

root
 |-- id: string (nullable = true)
 |-- attributes: string (nullable = true)

这个 attributes 列是json对象的数组。我试着把它分解成数据集,但一直失败。我试图将模式定义为:

StructType type = new StructType()
                .add("id", new IntegerType(), false)
                .add("name", new StringType(), false)
                .add("score", new FloatType(), false)
                .add("snippets", new IntegerType(), false );

ArrayType schema = new ArrayType(type, false);

并提供给 from_json 具体如下:

df = df.withColumn("val", functions.from_json(df.col("attributes"), schema));

此操作失败,出现匹配错误:

Exception in thread "main" scala.MatchError: org.apache.spark.sql.types.IntegerType@43756cb (of class org.apache.spark.sql.types.IntegerType)

正确的方法是什么?

4dc9hkyq

4dc9hkyq1#

可以通过以下方式指定架构:

val schema = ArrayType(
  StructType(Array(
    StructField("id", IntegerType, false),
    StructField("name", StringType, false),
    StructField("score", FloatType, false),
    StructField("snippets", IntegerType, false)
  )),
  false
)

val df1 = df.withColumn("val", from_json(col("attributes"), schema))

df1.show(false)

//+-----------+------------------------------------------------------+------------------------+
//|id         |attributes                                            |val                     |
//+-----------+------------------------------------------------------+------------------------+
//|YH8B135U123|[{"id":1,"name":"function","score":10.0,"snippets":1}]|[[1, function, 10.0, 1]]|
//+-----------+------------------------------------------------------+------------------------+

或者对于java:

import static org.apache.spark.sql.types.DataTypes.*;

StructType schema = createArrayType(createStructType(Arrays.asList(
    createStructField("id", IntegerType, false),
    createStructField("name", StringType, false),
    createStructField("score", FloatType, false),
    createStructField("snippets", StringType, false)
)), false);
d8tt03nd

d8tt03nd2#

您可以将架构定义为文本字符串:

val df2 = df.withColumn(
    "val",
    from_json(
        df.col("attributes"),
        lit("array<struct<id: int, name: string, score: float, snippets: int>>")
    )
)

df2.show(false)
+-----------+------------------------------------------------------+------------------------+
|id         |attributes                                            |val                     |
+-----------+------------------------------------------------------+------------------------+
|YH8B135U123|[{"id":1,"name":"function","score":10.0,"snippets":1}]|[[1, function, 10.0, 1]]|
+-----------+------------------------------------------------------+------------------------+

如果您喜欢使用模式:

val spark_struct = new StructType()
                .add("id", IntegerType, false)
                .add("name", StringType, false)
                .add("score", FloatType, false)
                .add("snippets", IntegerType, false)

val schema = new ArrayType(spark_struct, false)

val df2 = df.withColumn(
    "val",
    from_json(
        df.col("attributes"),
        schema
    )
)

您的原始代码有两个问题:(1)您使用了保留关键字 type 作为变量名,并且(2)不需要使用 newadd .

相关问题