我正在开发一个自定义streamingquerylistener,并希望在测试中触发它的onqueryterminated方法。
这就是我尝试实现的:
import org.apache.spark.sql.{ SQLContext, SparkSession }
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions.{ col, to_date }
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.scalatest.flatspec.AnyFlatSpec
class MyListener extends StreamingQueryListener {
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {}
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {}
override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = println(event.exception)
}
class ListenerSpec extends AnyFlatSpec {
it should "trigger onQueryTerminated" in {
val spark = SparkSession.builder().master("local[*]").getOrCreate()
spark.streams.addListener(new MyListener())
implicit val sqlContext: SQLContext = spark.sqlContext
import spark.implicits._
val stream = MemoryStream[Int]
stream.addData(Seq(1, 3, 4))
val query = stream
.toDF()
.withColumn("columnDoesntExist", to_date(col("names")))
.writeStream
.format("console")
.start()
query.awaitTermination()
}
}
但是,这不起作用,因为它会引发analysisexception,但onqueryterminated方法不会由流查询的终止触发。
在什么情况下该方法被触发,而event.exception是一些(异常)?
更新
以下代码成功触发 onQueryTerminated
:
val exceptionUdf = udf(() => throw new Exception())
val query = stream
.toDF()
.withColumn("exception", exceptionUdf())
.writeStream
.format("console")
.start()
请参阅公认的答案来解释原因。
1条答案
按热度按时间1rhkuytd1#
根据o'reilly出版的《ApacheSpark流处理》一书
onQueryTerminated
方法获取“流式查询停止时调用。这个
event
包含id
以及runId
与开始事件相关的字段。它还提供了一个exception
包含exception
如果查询由于错误而失败。“当你得到一个
AnalysisException
,您的查询尚未开始。它只进入catalyst optimizer的四个阶段中的第一个阶段,即“分析”,尚未转换为可运行代码:有关catalyst optimizer的更多详细信息。
analysisexception只是意味着与目录相关的代码中存在问题,这正是您想要做的:引用(目录中)不存在的列。
如果你想执行
onQueryTermination
方法您需要实现一个工作代码,但它在运行时失败(例如,提供错误的数据输入类型)。