我想设置配置,让我的应用程序跟踪来自kafka的消息。因此,每当它失败时,它都可以从上一次提交或消耗的偏移量开始选择。
readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.trigger(Trigger.Continuous("1 second")) // only change in query
.start();
我在网上看过 checkpointlocation
可以设置spark用来跟踪偏移的属性。
想知道在哪里可以设置此属性吗?我能在以下时间内输入上述代码吗 option
? 我可以知道怎样才能把它调好吗。
其次,我不能理解 trigger(Trigger.Continuous("1 second"))
财产。医生说 continuous processing engine will record the progress of the query every second
,它在阅读Kafka的信息时记录了什么样的进展?
1条答案
按热度按时间wqlqzqxt1#
您可以将检查点位置设置为
writeStream
:从kafka读取时跟踪进度意味着跟踪主题分区中消耗的偏移量。设置检查点位置将使您的应用程序能够将该信息作为json对象存储在给定的路径中。
这意味着应用程序已经消耗了分区中的偏移量10
0
分区偏移1001
主题的主题topic1
. 检查点是“提前”写入的(使用提前写入日志),因此应用程序将继续从kafka中读取消息,在预期或非预期(失败)重新启动之前,它将从kafka中断的位置读取消息。这个
Trigger.Continuous
从spark版本2.3开始提供。现在被标记为实验性的。与微批处理方法相比,它将在kafka中的每一条消息到达主题后立即获取,而不尝试将其与其他消息一起批处理。这可以改善延迟,但很可能会降低整体吞吐量。论点(例如。
1 seconds
)确定检查点的频率。当使用此触发模式时,重要的是至少有与主题具有分区一样多的可用核心。否则,申请将不会有任何进展。您可以在此处阅读更多信息:
例如,如果您正在读取一个有10个分区的kafka主题,则群集必须至少有10个核心才能进行查询