readstream关于一个主题,其中有两个流式查询正在编写

wmvff8tz  于 2021-07-09  发布在  Spark
关注(0)|答案(1)|浏览(337)

那么首先,是否可以使用两个不同的流式查询将stream写入同一个kafka主题?如果是,那么如何在这样一个主题上阅读流?谢谢参考代码片段

val StreamingQuery1 = DataFrame1.selectExpr("to_json(struct(*)) AS value")
        .writeStream
        .format("kafka")
        .option("topic", Topic)
        .queryName("Query1")
        .option("kafka.bootstrap.servers", kafkaBootstrapServer)
        .option("checkpointLocation",checkpointPath)
        .option("kafka.sasl.mechanism", "PLAIN")
        .option("kafka.security.protocol", "SASL_SSL")
        .option("kafka.sasl.jaas.config", saslJaasCfg)
        .option("kafka.timeout.ms", 18000)
        .option("kafka.request.timeout.ms", 18000)
        .option("kafka.session.timeout.ms", 18000)
        .option("kafka.heartbeat.interval.ms", 18000)
        .option("kafka.retries", 100)
        .option("failOnDataLoss", "false")
        .option("truncate", false)
        .start()

 val StreamingQuery2 = DataFrame2.selectExpr("to_json(struct(*)) AS value")
        .writeStream
        .format("kafka")
        .option("topic", Topic)
        .queryName("Query2")
        .option("kafka.bootstrap.servers", kafkaBootstrapServer)
        .option("checkpointLocation",checkpointPath)
        .option("kafka.sasl.mechanism", "PLAIN")
        .option("kafka.security.protocol", "SASL_SSL")
        .option("kafka.sasl.jaas.config", saslJaasCfg)
        .option("kafka.timeout.ms", 18000)
        .option("kafka.request.timeout.ms", 18000)
        .option("kafka.session.timeout.ms", 18000)
        .option("kafka.heartbeat.interval.ms", 18000)
        .option("kafka.retries", 100)
        .option("failOnDataLoss", "false")
        .option("truncate", false)
        .start()
        .awaitTermination()
wrrgggsh

wrrgggsh1#

是的,可以从多个流式查询写入同一Kafka主题。是的,可以读取这些数据-你只需要一个 readStream 主题(或主题列表,如果需要)。
Kafka(和其他类似系统)的优点是它将生产者和消费者解耦,如果需要,你可以有1:n,n:1,n:m的组合。
收到代码后更新:
这可能是检查点位置的问题,因为两个writestream操作都指向同一个位置: .option("checkpointLocation",checkpointPath) 另外,与其等待特定流完成,不如这样做 spark.streams.awaitAnyTermination() 并检查哪些流已经完成。

相关问题