spark结构化流媒体中消耗消息的跟踪

roqulrg3  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(358)

我想设置配置,让我的应用程序跟踪来自kafka的消息。因此,每当它失败时,它都可以从上一次提交或消耗的偏移量开始选择。

readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .trigger(Trigger.Continuous("1 second"))  // only change in query
  .start();

我在网上看过 checkpointlocation 可以设置spark用来跟踪偏移的属性。
想知道在哪里可以设置此属性吗?我能在以下时间内输入上述代码吗 option ? 我可以知道怎样才能把它调好吗。
其次,我不能理解 trigger(Trigger.Continuous("1 second")) 财产。医生说 continuous processing engine will record the progress of the query every second ,它在阅读Kafka的信息时记录了什么样的进展?

wqlqzqxt

wqlqzqxt1#

您可以将检查点位置设置为 writeStream :

[...]
.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .option("checkpointLocation", "/path/to/dir")
  .trigger(Trigger.Continuous("1 second"))
  .start();

从kafka读取时跟踪进度意味着跟踪主题分区中消耗的偏移量。设置检查点位置将使您的应用程序能够将该信息作为json对象存储在给定的路径中。

{
  "topic1":{
    "0":11, 
    "1":101
  }
}

这意味着应用程序已经消耗了分区中的偏移量10 0 分区偏移100 1 主题的主题 topic1 . 检查点是“提前”写入的(使用提前写入日志),因此应用程序将继续从kafka中读取消息,在预期或非预期(失败)重新启动之前,它将从kafka中断的位置读取消息。
这个 Trigger.Continuous 从spark版本2.3开始提供。现在被标记为实验性的。与微批处理方法相比,它将在kafka中的每一条消息到达主题后立即获取,而不尝试将其与其他消息一起批处理。这可以改善延迟,但很可能会降低整体吞吐量。
论点(例如。 1 seconds )确定检查点的频率。
当使用此触发模式时,重要的是至少有与主题具有分区一样多的可用核心。否则,申请将不会有任何进展。您可以在此处阅读更多信息:
例如,如果您正在读取一个有10个分区的kafka主题,则群集必须至少有10个核心才能进行查询

相关问题