我使用spark结构化流来从kafka流式传输数据,这给了我以下模式的Dataframe
Column Type
key binary
value binary
topic string
partition int
offset long
timestamp long
timestampType int
这里的值列是二进制格式的,但它实际上是一个结构类型的json字符串,需要读取json结构,屏蔽其中的几个字段并写入数据。
我使用spark结构化流来从kafka流式传输数据,这给了我以下模式的Dataframe
Column Type
key binary
value binary
topic string
partition int
offset long
timestamp long
timestampType int
这里的值列是二进制格式的,但它实际上是一个结构类型的json字符串,需要读取json结构,屏蔽其中的几个字段并写入数据。
1条答案
按热度按时间8dtrkrch1#
您可以按照structured streaming+kafka integration指南中给出的指导来理解如何将二进制值转换为字符串值。
然后可以根据实际的json结构定义模式,例如:
使用
from_json
函数将允许您处理json字符串中的数据,请参阅文档,例如:这样,您就可以通过使用结构化api(如
withColumn
以及drop
.如果不想定义整个模式,可以考虑使用
get_json_object
.