ApacheKafka—spark结构化流媒体—是否可以写入两次偏移量

gtlvzcf8  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(386)

我正在使用spark结构化流媒体来使用来自kafka主题的数据,并将数据写入另一个kafka接收器。
我想把偏移量存储两次—一次是在阅读主题时,然后重新打印偏移量。第二,在将数据写入输出接收器并写入偏移量时,可以通过给定检查点目录位置,
是否可以写入订阅主题时消耗的偏移量。

4zcjmb1e

4zcjmb1e1#

您可以使用streamingquerylistener。您可以通过以下方式将侦听器添加到流中

spark.streams.addListener(new StreamingQueryListener() {

  override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = { 

    // insert code here to log the offsets in addition to Spark's checkpoint

  }

  override def onQueryProgress(event: QueryProgressEvent): Unit = {}

  override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {}
})

相关问题