我在读Kafka的留言。邮件格式如下(示例格式): {"schema":{"type":"struct","name":"emp_table","fields":[{"field":"emp_id","type":"string"},{"field":"emp_name","type":"String"},{"field":"city","type":"string"},{"field":"emp_sal","type":"string"},{"field":"manager_name","type":"string"}]},"payload":{"emp_id":"1","emp_name":"","city":"NYK","emp_sal":"100000","manager_name":"xyz"}}
另外,请注意主题有来自不同表的消息,而不仅仅是一个表。
我试图实现的是使用spark结构化流读取kafka主题中的上述消息,并创建一个列名称和值都来自json消息本身的Dataframe。
我不想使用case类或structtype显式定义模式。
我试过这个:
val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", brokers).option("subscribe", "topic1").option("startingOffsets", "earliest").load()
val y=df.select(get_json_object(($"value"), "$.payload").alias("payload")
当我查看y(这是一个Dataframe)时,它作为一列出现,在该列的有效负载下的值为json。
如何获取Dataframe中的单个列?我不能做到这一点。
(再次重申,我不能对模式部分使用泛型case类或structtype,因为通过kafka消息传递的消息来自不同的表,所以我希望在运行时从json本身创建一个更动态的模式。)
1条答案
按热度按时间kpbpu0081#
选项1:将kafka connect源更改为set
value.converter.schemas.enable=false
. 这只会给你(展开的有效载荷开始),然后你可以跳到下面的文章。否则,在剥离connect模式之后,需要使用
from_json()
应用架构所有字段都是字符串,因此
相关
如何在kafka connect 0.10和spark structured streaming中使用fromèjson?
https://github.com/katsou55/kafka-spark-structured-streaming-example/blob/master/src/main/scala-2.11/main.scala
如何使用结构化流从kafka读取json格式的记录?