使用spark结构化流读取kafka connect jsonconverter消息和模式

yb3bgrhw  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(423)

我在读Kafka的留言。邮件格式如下(示例格式): {"schema":{"type":"struct","name":"emp_table","fields":[{"field":"emp_id","type":"string"},{"field":"emp_name","type":"String"},{"field":"city","type":"string"},{"field":"emp_sal","type":"string"},{"field":"manager_name","type":"string"}]},"payload":{"emp_id":"1","emp_name":"","city":"NYK","emp_sal":"100000","manager_name":"xyz"}} 另外,请注意主题有来自不同表的消息,而不仅仅是一个表。
我试图实现的是使用spark结构化流读取kafka主题中的上述消息,并创建一个列名称和值都来自json消息本身的Dataframe。
我不想使用case类或structtype显式定义模式。
我试过这个:

val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", brokers).option("subscribe", "topic1").option("startingOffsets", "earliest").load()

val y=df.select(get_json_object(($"value"), "$.payload").alias("payload")

当我查看y(这是一个Dataframe)时,它作为一列出现,在该列的有效负载下的值为json。
如何获取Dataframe中的单个列?我不能做到这一点。
(再次重申,我不能对模式部分使用泛型case类或structtype,因为通过kafka消息传递的消息来自不同的表,所以我希望在运行时从json本身创建一个更动态的模式。)

kpbpu008

kpbpu0081#

选项1:将kafka connect源更改为set value.converter.schemas.enable=false . 这只会给你(展开的有效载荷开始),然后你可以跳到下面的文章。
否则,在剥离connect模式之后,需要使用 from_json() 应用架构

val y = df.select(get_json_object($"value", "$.payload").alias("payload"))
val z = df.select(from_json($"payload", schema))

所有字段都是字符串,因此

val schema: StructType = StructType(Seq(
  StructField("emp_id", StringType()),
  StructField("emp_name", StringType()),
  StructField("city", StringType()),
  StructField("emp_sal", StringType()),
  StructField("manager_name", StringType())
))

相关
如何在kafka connect 0.10和spark structured streaming中使用fromèjson?
https://github.com/katsou55/kafka-spark-structured-streaming-example/blob/master/src/main/scala-2.11/main.scala
如何使用结构化流从kafka读取json格式的记录?

相关问题