spark structured streaming读取嵌套的kafka connect jsonconverter消息

0ejtzxu1  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(454)

我已经用kafkaconnect file pulse connector 1.5.3接收了xml文件,然后我想用spark streaming读取它来解析/展平它。因为它是非常嵌套的。
我从kafka读取的字符串(我使用消费者控制台读取了这个字符串,并在 payload 如下图所示:

{
"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"string","optional":true,"field":"city"},{"type":"array","items":{"type":"struct","fields":[{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":true,"field":"unit"},{"type":"string","optional":true,"field":"value"}],"optional":true,"name":"Value"},"optional":true,"field":"value"}],"optional":true,"name":"ForcedArrayType"},"optional":true,"field":"forcedArrayField"},{"type":"string","optional":true,"field":"lastField"}],"optional":true,"name":"Data","field":"data"}],"optional":true}

,"payload":{"data":{"city":"someCity","forcedArrayField":[{"value":[{"unit":"unitField1","value":"123"},{"unit":"unitField1","value":"456"}]}],"lastField":"2020-08-02T18:02:00"}}
}

尝试的数据类型:

StructType schema = new StructType();
    schema = schema.add( "schema", StringType, false);
    schema = schema.add( "payload", StringType, false);

    StructType Data = new StructType();
    StructType ValueArray = new StructType(new StructField[]{
            new StructField("unit", StringType,true,Metadata.empty()),
            new StructField("value", StringType,true,Metadata.empty())
    });
    StructType ForcedArrayType = new StructType(new StructField[]{
            new StructField("valueArray", ValueArray,true,Metadata.empty())
    });

    Data = Data.add("city",StringType,true);
    Data = Data.add("forcedArrayField",ForcedArrayType,true);
    Data = Data.add("lastField",StringType,true);

    StructType Record = new StructType();
    Record = Record.add("data", Data, false);

我尝试的查询:

//below worked for payload
        Dataset<Row> parsePayload = lines
                .selectExpr("cast (value as string) as json")
                .select(functions.from_json(functions.col("json"), schema=schema).as("schemaAndPayload"))
                .select("schemaAndPayload.payload").as("payload");

        System.out.println(parsePayload.isStreaming());

        //below makes the output empty:
        Dataset<Row> parseValue = parsePayload.select(functions.from_json(functions.col("payload"), Record).as("cols"))
                .select(functions.col("cols.data.city"));
//.select(functions.col("cols.*"));

        StreamingQuery query = parseValue
                .writeStream()
                .format("console")
                .outputMode(OutputMode.Append())
                .start();
        query.awaitTermination();

当我输出parsepayload流时,我可以看到数据(仍然是json结构),但是当我想选择某些/所有字段时,比如上面的city。它是空的。
需要帮助原因数据类型定义错误吗?或者查询错误?
另外,在控制台上,当我试图输出“parsepayload”而不是“parsevalue”时,它显示了一些数据,这让我觉得“payload”部分起作用了。

|{"data":{"city":"...|
...
vaqhlq81

vaqhlq811#

您的架构定义错误。 payload 以及 schema 可能不是列/字段将其作为静态json(spark.read.json)读取并获取架构,然后在结构化流中使用它。

ffscu2ro

ffscu2ro2#

您的模式定义似乎不完全正确。我复制了您的问题,并能够用以下模式解析json

val payloadSchema = new StructType()
  .add("data", new StructType()
    .add("city", StringType)
    .add("forcedArrayField", ArrayType(new StructType()
      .add("value", ArrayType(new StructType()
        .add("unit", StringType)
        .add("value", StringType)))))
    .add("lastField", StringType))

然后访问各个字段时,我使用了以下选项:

val parsePayload = df
    .selectExpr("cast (value as string) as json")
    .select(functions.from_json(functions.col("json"), schema).as("schemaAndPayload"))
    .select("schemaAndPayload.payload").as("payload")
    .select(functions.from_json(functions.col("payload"), payloadSchema).as("cols"))
    .select(col("cols.data.city").as("city"), explode(col("cols.data.forcedArrayField")).as("forcedArrayField"), col("cols.data.lastField").as("lastField"))
    .select(col("city"), explode(col("forcedArrayField.value").as("middleFields")), col("lastField"))

这就产生了输出

+--------+-----------------+-------------------+
|    city|              col|          lastField|
+--------+-----------------+-------------------+
|someCity|[unitField1, 123]|2020-08-02T18:02:00|
|someCity|[unitField1, 456]|2020-08-02T18:02:00|
+--------+-----------------+-------------------+

相关问题