我有一个从Kafka流创建的Dataframe。我想把它减少到一个值,然后在我的程序中使用这个值。
```scala
import sparkSession.implicits._
val df = sparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", ...)
.option("subscribe", "theTopic")
.load()
val result = df
.selectExpr("CAST(value AS STRING) as json")
.map(json => getAnInt(json))
.reduce { (x, y) =>
if (x > y) x else y
}
someOtherFunction(result)
我希望将流简化为一个值,然后在我的程序的其余部分中使用。相反,它失败了:
org.apache.spark.sql.analysisexception:具有流源的查询必须使用writestream.start()执行;;kafka位于org.apache.spark.sql.catalyst.analysis.unsupportedoperationchecker$.throwerror(unsupportedoperationchecker)。scala:389)在org.apache.spark.sql.catalyst.analysis.u。。。
1条答案
按热度按时间qgelzfjb1#
你只能用
writeStream
在流Dataframe上。我不确定你是否想要这个流Dataframe。如果你移除readStream
使用read
相反,你可以解决这个问题!