我已经用kafkaconnect file pulse connector 1.5.3接收了xml文件,然后我想用spark streaming读取它来解析/展平它。因为它是非常嵌套的。
我从kafka读取的字符串(我使用消费者控制台读取了这个字符串,并在 payload
如下图所示:
{
"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"string","optional":true,"field":"city"},{"type":"array","items":{"type":"struct","fields":[{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":true,"field":"unit"},{"type":"string","optional":true,"field":"value"}],"optional":true,"name":"Value"},"optional":true,"field":"value"}],"optional":true,"name":"ForcedArrayType"},"optional":true,"field":"forcedArrayField"},{"type":"string","optional":true,"field":"lastField"}],"optional":true,"name":"Data","field":"data"}],"optional":true}
,"payload":{"data":{"city":"someCity","forcedArrayField":[{"value":[{"unit":"unitField1","value":"123"},{"unit":"unitField1","value":"456"}]}],"lastField":"2020-08-02T18:02:00"}}
}
尝试的数据类型:
StructType schema = new StructType();
schema = schema.add( "schema", StringType, false);
schema = schema.add( "payload", StringType, false);
StructType Data = new StructType();
StructType ValueArray = new StructType(new StructField[]{
new StructField("unit", StringType,true,Metadata.empty()),
new StructField("value", StringType,true,Metadata.empty())
});
StructType ForcedArrayType = new StructType(new StructField[]{
new StructField("valueArray", ValueArray,true,Metadata.empty())
});
Data = Data.add("city",StringType,true);
Data = Data.add("forcedArrayField",ForcedArrayType,true);
Data = Data.add("lastField",StringType,true);
StructType Record = new StructType();
Record = Record.add("data", Data, false);
我尝试的查询:
//below worked for payload
Dataset<Row> parsePayload = lines
.selectExpr("cast (value as string) as json")
.select(functions.from_json(functions.col("json"), schema=schema).as("schemaAndPayload"))
.select("schemaAndPayload.payload").as("payload");
System.out.println(parsePayload.isStreaming());
//below makes the output empty:
Dataset<Row> parseValue = parsePayload.select(functions.from_json(functions.col("payload"), Record).as("cols"))
.select(functions.col("cols.data.city"));
//.select(functions.col("cols.*"));
StreamingQuery query = parseValue
.writeStream()
.format("console")
.outputMode(OutputMode.Append())
.start();
query.awaitTermination();
当我输出parsepayload流时,我可以看到数据(仍然是json结构),但是当我想选择某些/所有字段时,比如上面的city。它是空的。
需要帮助原因数据类型定义错误吗?或者查询错误?
另外,在控制台上,当我试图输出“parsepayload”而不是“parsevalue”时,它显示了一些数据,这让我觉得“payload”部分起作用了。
|{"data":{"city":"...|
...
2条答案
按热度按时间vaqhlq811#
您的架构定义错误。
payload
以及schema
可能不是列/字段将其作为静态json(spark.read.json)读取并获取架构,然后在结构化流中使用它。ffscu2ro2#
您的模式定义似乎不完全正确。我复制了您的问题,并能够用以下模式解析json
然后访问各个字段时,我使用了以下选项:
这就产生了输出