spark structured streaming kafka convert json without schema(推断schema)

pcww981p  于 2021-06-07  发布在  Kafka
关注(0)|答案(5)|浏览(500)

我读到spark structured streaming不支持将kafka消息作为json读取的模式推断。有没有一种方法可以像spark streaming一样检索模式:

val dataFrame = spark.read.json(rdd.map(_.value()))
dataFrame.printschema
ql3eal8s

ql3eal8s1#

这是不可能的。spark流在开发中支持有限的模式推理 spark.sql.streaming.schemaInference 设置为 true :
默认情况下,来自基于文件的源的结构化流需要您指定模式,而不是依赖spark自动推断模式。此限制确保流式查询使用一致的模式,即使在失败的情况下也是如此。对于特殊用例,可以通过将spark.sql.streaming.schemaReference设置为true来重新启用模式推断。
但它不能用于从kafka消息和 DataFrameReader.json 不支持流媒体 Datasets 作为论据。
您必须手动提供模式如何使用结构化流从kafka读取json格式的记录?

egmofgnx

egmofgnx2#

如果您想问这个问题的话,可以将json转换为Dataframe,而不必手动键入模式。
最近我遇到了这样一种情况:我通过kafka接收大量嵌套的json数据包,手动输入模式既麻烦又容易出错。
通过一个小的数据样本和一些技巧,您可以为spark2+提供如下模式:

val jsonstr = """ copy paste a representative sample of data here"""
val jsondf = spark.read.json(Seq(jsonstr).toDS) //jsondf.schema has the nested json structure we need

val event = spark.readStream.format..option...load() //configure your source

val eventWithSchema = event.select($"value" cast "string" as "json").select(from_json($"json", jsondf.schema) as "data").select("data.*")

现在你可以用这个val做任何你想做的事情,就像直接流媒体一样。创建临时视图,运行sql查询等等。。

jq6vz3qz

jq6vz3qz3#

将arnon的解决方案带到下一步(因为它在spark的较新版本中已被弃用,并且只需要为类型转换迭代整个Dataframe)

spark.read.json(df.as[String])

不管怎样,到目前为止,它仍然是实验性的。

nkhmeac6

nkhmeac64#

以下是一种可行的方法:
在开始流媒体之前,先从Kafka那个里获取一小批数据
从小批量推断模式
开始使用提取的模式流式传输数据。
下面的伪代码说明了这种方法。
第一步:
从Kafka那里提取一小批(两张唱片),

val smallBatch = spark.read.format("kafka")
                           .option("kafka.bootstrap.servers", "node:9092")
                           .option("subscribe", "topicName")
                           .option("startingOffsets", "earliest")
                           .option("endingOffsets", """{"topicName":{"0":2}}""")
                           .load()
                           .selectExpr("CAST(value AS STRING) as STRING").as[String].toDF()

步骤2:将小批量写入文件:

smallBatch.write.mode("overwrite").format("text").save("/batch")

此命令将小批写入hdfs directory/batch。它创建的文件的名称是part xyz*。因此,首先需要使用hadoop文件系统命令重命名文件(请参见org.apache.hadoop.fs.\和org.apache.hadoop.conf.configuration,下面是一个示例https://stackoverflow.com/a/41990859)然后以json格式读取文件:

val smallBatchSchema = spark.read.json("/batch/batchName.txt").schema

在这里,batchname.txt是文件的新名称,smallbatchschema包含从小批推断出的模式。
最后,您可以按如下方式流式传输数据(步骤3):

val inputDf = spark.readStream.format("kafka")
                             .option("kafka.bootstrap.servers", "node:9092")
                             .option("subscribe", "topicName")
                             .option("startingOffsets", "earliest")
                             .load()

val dataDf = inputDf.selectExpr("CAST(value AS STRING) as json")
                    .select( from_json($"json", schema=smallBatchSchema).as("data"))
                    .select("data.*")

希望这有帮助!

ht4b089n

ht4b089n5#

可以使用此结构:

myStream = spark.readStream.schema(spark.read.json("my_sample_json_file_as_schema.json").schema).json("my_json_file")..

怎么会这样?好吧,由于spark.read.json(“..”).schema正好返回所需的推断模式,因此您可以将此返回的模式用作spark.readstream的强制模式参数的参数
我所做的是指定一个一行示例json作为推断模式内容的输入,这样就不会占用不必要的内存。如果数据发生更改,只需更新示例json即可。
我花了一段时间才弄明白(手工构造structtypes和structfields是痛苦的…),因此我很乐意为所有人投票:-)

相关问题