那么首先,是否可以使用两个不同的流式查询将stream写入同一个kafka主题?如果是,那么如何在这样一个主题上阅读流?谢谢参考代码片段
val StreamingQuery1 = DataFrame1.selectExpr("to_json(struct(*)) AS value")
.writeStream
.format("kafka")
.option("topic", Topic)
.queryName("Query1")
.option("kafka.bootstrap.servers", kafkaBootstrapServer)
.option("checkpointLocation",checkpointPath)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", saslJaasCfg)
.option("kafka.timeout.ms", 18000)
.option("kafka.request.timeout.ms", 18000)
.option("kafka.session.timeout.ms", 18000)
.option("kafka.heartbeat.interval.ms", 18000)
.option("kafka.retries", 100)
.option("failOnDataLoss", "false")
.option("truncate", false)
.start()
val StreamingQuery2 = DataFrame2.selectExpr("to_json(struct(*)) AS value")
.writeStream
.format("kafka")
.option("topic", Topic)
.queryName("Query2")
.option("kafka.bootstrap.servers", kafkaBootstrapServer)
.option("checkpointLocation",checkpointPath)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", saslJaasCfg)
.option("kafka.timeout.ms", 18000)
.option("kafka.request.timeout.ms", 18000)
.option("kafka.session.timeout.ms", 18000)
.option("kafka.heartbeat.interval.ms", 18000)
.option("kafka.retries", 100)
.option("failOnDataLoss", "false")
.option("truncate", false)
.start()
.awaitTermination()
1条答案
按热度按时间wrrgggsh1#
是的,可以从多个流式查询写入同一Kafka主题。是的,可以读取这些数据-你只需要一个
readStream
主题(或主题列表,如果需要)。Kafka(和其他类似系统)的优点是它将生产者和消费者解耦,如果需要,你可以有1:n,n:1,n:m的组合。
收到代码后更新:
这可能是检查点位置的问题,因为两个writestream操作都指向同一个位置:
.option("checkpointLocation",checkpointPath)
另外,与其等待特定流完成,不如这样做spark.streams.awaitAnyTermination()
并检查哪些流已经完成。