在apache spark streamingquerylisteners中何时触发onqueryterminated?

kpbwa7wx  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(362)

我正在开发一个自定义streamingquerylistener,并希望在测试中触发它的onqueryterminated方法。
这就是我尝试实现的:

import org.apache.spark.sql.{ SQLContext, SparkSession }
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions.{ col, to_date }
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.scalatest.flatspec.AnyFlatSpec

class MyListener extends StreamingQueryListener {
  override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit       = {}
  override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit     = {}
  override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = println(event.exception)
}

class ListenerSpec extends AnyFlatSpec {

  it should "trigger onQueryTerminated" in {
    val spark = SparkSession.builder().master("local[*]").getOrCreate()
    spark.streams.addListener(new MyListener())
    implicit val sqlContext: SQLContext = spark.sqlContext

    import spark.implicits._

    val stream = MemoryStream[Int]
    stream.addData(Seq(1, 3, 4))

    val query = stream
      .toDF()
      .withColumn("columnDoesntExist", to_date(col("names")))
      .writeStream
      .format("console")
      .start()

    query.awaitTermination()
  }
}

但是,这不起作用,因为它会引发analysisexception,但onqueryterminated方法不会由流查询的终止触发。
在什么情况下该方法被触发,而event.exception是一些(异常)?

更新

以下代码成功触发 onQueryTerminated :

val exceptionUdf = udf(() => throw new Exception())

val query = stream
      .toDF()
      .withColumn("exception", exceptionUdf())
      .writeStream
      .format("console")
      .start()

请参阅公认的答案来解释原因。

1rhkuytd

1rhkuytd1#

根据o'reilly出版的《ApacheSpark流处理》一书 onQueryTerminated 方法获取
“流式查询停止时调用。这个 event 包含 id 以及 runId 与开始事件相关的字段。它还提供了一个 exception 包含 exception 如果查询由于错误而失败。“
当你得到一个 AnalysisException ,您的查询尚未开始。它只进入catalyst optimizer的四个阶段中的第一个阶段,即“分析”,尚未转换为可运行代码:

有关catalyst optimizer的更多详细信息。
analysisexception只是意味着与目录相关的代码中存在问题,这正是您想要做的:引用(目录中)不存在的列。
如果你想执行 onQueryTermination 方法您需要实现一个工作代码,但它在运行时失败(例如,提供错误的数据输入类型)。

相关问题