我读到spark structured streaming不支持将kafka消息作为json读取的模式推断。有没有一种方法可以像spark streaming一样检索模式:
val dataFrame = spark.read.json(rdd.map(_.value()))
dataFrame.printschema
我读到spark structured streaming不支持将kafka消息作为json读取的模式推断。有没有一种方法可以像spark streaming一样检索模式:
val dataFrame = spark.read.json(rdd.map(_.value()))
dataFrame.printschema
5条答案
按热度按时间ql3eal8s1#
这是不可能的。spark流在开发中支持有限的模式推理
spark.sql.streaming.schemaInference
设置为true
:默认情况下,来自基于文件的源的结构化流需要您指定模式,而不是依赖spark自动推断模式。此限制确保流式查询使用一致的模式,即使在失败的情况下也是如此。对于特殊用例,可以通过将spark.sql.streaming.schemaReference设置为true来重新启用模式推断。
但它不能用于从kafka消息和
DataFrameReader.json
不支持流媒体Datasets
作为论据。您必须手动提供模式如何使用结构化流从kafka读取json格式的记录?
egmofgnx2#
如果您想问这个问题的话,可以将json转换为Dataframe,而不必手动键入模式。
最近我遇到了这样一种情况:我通过kafka接收大量嵌套的json数据包,手动输入模式既麻烦又容易出错。
通过一个小的数据样本和一些技巧,您可以为spark2+提供如下模式:
现在你可以用这个val做任何你想做的事情,就像直接流媒体一样。创建临时视图,运行sql查询等等。。
jq6vz3qz3#
将arnon的解决方案带到下一步(因为它在spark的较新版本中已被弃用,并且只需要为类型转换迭代整个Dataframe)
不管怎样,到目前为止,它仍然是实验性的。
nkhmeac64#
以下是一种可行的方法:
在开始流媒体之前,先从Kafka那个里获取一小批数据
从小批量推断模式
开始使用提取的模式流式传输数据。
下面的伪代码说明了这种方法。
第一步:
从Kafka那里提取一小批(两张唱片),
步骤2:将小批量写入文件:
此命令将小批写入hdfs directory/batch。它创建的文件的名称是part xyz*。因此,首先需要使用hadoop文件系统命令重命名文件(请参见org.apache.hadoop.fs.\和org.apache.hadoop.conf.configuration,下面是一个示例https://stackoverflow.com/a/41990859)然后以json格式读取文件:
在这里,batchname.txt是文件的新名称,smallbatchschema包含从小批推断出的模式。
最后,您可以按如下方式流式传输数据(步骤3):
希望这有帮助!
ht4b089n5#
可以使用此结构:
怎么会这样?好吧,由于spark.read.json(“..”).schema正好返回所需的推断模式,因此您可以将此返回的模式用作spark.readstream的强制模式参数的参数
我所做的是指定一个一行示例json作为推断模式内容的输入,这样就不会占用不必要的内存。如果数据发生更改,只需更新示例json即可。
我花了一段时间才弄明白(手工构造structtypes和structfields是痛苦的…),因此我很乐意为所有人投票:-)