如何将from\u json与kafka connect 0.10和spark structured streaming结合使用?

xqkwcwgp  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(681)

我试图从[databricks][1]复制这个示例,并将其应用到kafka和spark structured streaming的新连接器中,但是我无法使用spark中现成的方法正确解析json。。。
注:本主题以json格式写入kafka。

val ds1 = spark
          .readStream
          .format("kafka")
          .option("kafka.bootstrap.servers", IP + ":9092")
          .option("zookeeper.connect", IP + ":2181")
          .option("subscribe", TOPIC)
          .option("startingOffsets", "earliest")
          .option("max.poll.records", 10)
          .option("failOnDataLoss", false)
          .load()

下面的代码将无法工作,我相信这是因为列json是一个字符串,与来自\u json签名的方法不匹配。。。

val df = ds1.select($"value" cast "string" as "json")
                .select(from_json("json") as "data")
                .select("data.*")

有什么建议吗?
[更新]工作示例:https://github.com/katsou55/kafka-spark-structured-streaming-example/blob/master/src/main/scala-2.11/main.scala

omvjsjqw

omvjsjqw1#

首先需要为json消息定义模式。例如

val schema = new StructType()
  .add($"id".string)
  .add($"name".string)

现在您可以在中使用此模式 from_json 方法如下。

val df = ds1.select($"value" cast "string" as "json")
            .select(from_json($"json", schema) as "data")
            .select("data.*")

相关问题