scala—使用结构化流处理来自kafka的json数据

14ifxucb  于 2021-06-04  发布在  Kafka
关注(0)|答案(3)|浏览(677)

我想把kafka传入的json数据转换成Dataframe。
我使用结构化流媒体 Scala 2.12 大多数人都会添加硬编码的模式,但是如果json可以有额外的字段,那么每次都需要更改代码基,这很乏味。
一种方法是将它写入一个文件并用它进行推断,但我宁愿避免这样做。
有没有别的办法解决这个问题?
编辑:找到了一种将json字符串转换为Dataframe的方法,但无法从流源中提取它,是否可以提取它?

o7jaxewo

o7jaxewo1#

一种方法是将模式本身存储在消息头(而不是键或值)中。
尽管这样会增加消息的大小,但解析json值将很容易,而不需要任何外部资源,如文件或模式注册表。
新消息可以有新的模式,而旧消息仍然可以使用它们的旧模式本身进行处理,因为模式在消息本身中。
或者,您可以对模式进行版本设置并包含 id 对于消息头中的每个模式(或)键或值中的一个魔术字节,并从中推断出模式。
这种方法之后是合流模式注册表。它允许您基本上浏览同一模式的不同版本,并查看您的模式是如何随着时间的推移而演变的。

kqhtkvqz

kqhtkvqz2#

基于javatechnical的回答,最好的方法是使用模式注册表和avro数据,而不是json,目前还没有硬编码模式的方法。
将您的架构名称和id作为头,并使用它们从架构注册表中读取架构。
使用 from_avro 把这些数据转换成数据流!

j1dl9f46

j1dl9f463#

以字符串形式读取数据,然后将其转换为map[string,string],这样您就可以在不知道其模式的情况下处理任何json

相关问题