我想把kafka传入的json数据转换成Dataframe。
我使用结构化流媒体 Scala 2.12
大多数人都会添加硬编码的模式,但是如果json可以有额外的字段,那么每次都需要更改代码基,这很乏味。
一种方法是将它写入一个文件并用它进行推断,但我宁愿避免这样做。
有没有别的办法解决这个问题?
编辑:找到了一种将json字符串转换为Dataframe的方法,但无法从流源中提取它,是否可以提取它?
我想把kafka传入的json数据转换成Dataframe。
我使用结构化流媒体 Scala 2.12
大多数人都会添加硬编码的模式,但是如果json可以有额外的字段,那么每次都需要更改代码基,这很乏味。
一种方法是将它写入一个文件并用它进行推断,但我宁愿避免这样做。
有没有别的办法解决这个问题?
编辑:找到了一种将json字符串转换为Dataframe的方法,但无法从流源中提取它,是否可以提取它?
3条答案
按热度按时间o7jaxewo1#
一种方法是将模式本身存储在消息头(而不是键或值)中。
尽管这样会增加消息的大小,但解析json值将很容易,而不需要任何外部资源,如文件或模式注册表。
新消息可以有新的模式,而旧消息仍然可以使用它们的旧模式本身进行处理,因为模式在消息本身中。
或者,您可以对模式进行版本设置并包含
id
对于消息头中的每个模式(或)键或值中的一个魔术字节,并从中推断出模式。这种方法之后是合流模式注册表。它允许您基本上浏览同一模式的不同版本,并查看您的模式是如何随着时间的推移而演变的。
kqhtkvqz2#
基于javatechnical的回答,最好的方法是使用模式注册表和avro数据,而不是json,目前还没有硬编码模式的方法。
将您的架构名称和id作为头,并使用它们从架构注册表中读取架构。
使用
from_avro
把这些数据转换成数据流!j1dl9f463#
以字符串形式读取数据,然后将其转换为map[string,string],这样您就可以在不知道其模式的情况下处理任何json