Spark2.2引入了Kafka的结构化流媒体源。据我所知,它依赖于hdfs checkpoint目录来存储偏移量并保证“一次准确”的消息传递。
但是老码头(比如https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/)表示spark流检查点不能跨应用程序或spark升级恢复,因此不太可靠。作为一种解决方案,有一种做法支持在支持mysql或redshiftdb之类事务的外部存储中存储偏移量。
如果我想将kafka源的偏移量存储到事务数据库中,如何从结构化流批处理中获取偏移量?
以前,可以通过将rdd转换为 HasOffsetRanges
:
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
但是有了新的流api,我有了一个 Dataset
的 InternalRow
我找不到一个简单的方法来获取偏移量。接收器api只有 addBatch(batchId: Long, data: DataFrame)
方法以及如何为给定的批id获取偏移量?
3条答案
按热度按时间2mbi3lxu1#
相关的spark-dev邮件列表讨论线程在这里。
总结:
spark streaming将支持在未来版本(>2.2.0)中获得偏移量。jira门票https://issues-test.apache.org/jira/browse/spark-18258
对于spark<=2.2.0,可以通过从checkpoint目录中读取json来获取给定批的偏移量(api不稳定,所以要小心):
这个
endOffset
将包含每个主题/分区的until偏移量。获取起始偏移量是有问题的,因为您必须读取'commit'检查点目录。但通常情况下,您并不关心开始偏移,因为存储结束偏移对于可靠的spark作业重新启动来说已经足够了。请注意,您还必须将处理后的批处理id存储在存储器中。在某些情况下,spark可以使用相同的批处理id重新运行失败的批处理,因此请确保使用最新处理的批处理id(您应该从外部存储中读取)初始化自定义接收器,并忽略id<latestprocessedbatchid的任何批处理。顺便说一句,批id在查询中不是唯一的,因此必须分别存储每个查询的批id。
ubby3x7f2#
Spark2.2引入了Kafka的结构化流媒体源。据我所知,它依赖于hdfs checkpoint dir来存储偏移量,并保证“恰好一次”的消息传递。
对的。
每一次触发都会将偏移量保存到
offset
检查点位置中的目录(使用checkpointLocation
选项或spark.sql.streaming.checkpointLocation
spark属性或随机分配的),该属性应保证最多只处理一次偏移。该功能称为写前日志。检查点位置中的另一个目录是
commits
已完成流式处理批的目录,每个批有一个文件(文件名为批id)。引用容错语义的官方文档:
为了实现这一点,我们设计了结构化流源、接收器和执行引擎,以可靠地跟踪处理的确切进度,从而使其能够通过重新启动和/或重新处理来处理任何类型的故障。假设每个流源都有偏移量(类似于kafka偏移量或kinesis序列号)来跟踪流中的读取位置。引擎使用检查点和预写日志来记录每个触发器中正在处理的数据的偏移范围。流汇被设计成处理后处理的幂等元。同时,使用可重放源和幂等接收器,结构化流可以确保在任何失败情况下端到端只使用一次语义。
每次执行触发器时
StreamExecution
检查目录并“计算”已处理的偏移量。这给了你至少一次语义,总共一次。但是老文档(…)说,spark流检查点不能跨应用程序或spark升级恢复,因此不太可靠。
你叫他们“老”是有原因的,不是吗?
它们指的是旧的(在我看来)死Spark流,它不仅保留了偏移量,还保留了整个查询代码,导致检查点几乎不可用的情况,例如,当您更改代码时。
时代已经过去了,结构化流媒体更加谨慎什么时候检查点。
如果我想将kafka源的偏移量存储到事务数据库中,如何从结构化流批处理中获取偏移量?
解决方案可以是实现或以某种方式使用用于处理偏移检查点的metadatalog接口。那可能有用。
我该如何获得给定批次id的偏移量?
目前还不可能。
我的理解是,你将无法做到这一点,因为流的语义是对你隐藏的。您根本不应该处理这种称为偏移量的低级“东西”,spark结构化流媒体使用它来提供精确的一次性保证。
引用michael armbrust在spark峰会上的讲话,使用apachespark中的结构化流进行简单、可扩展、容错的流处理:
你不应该对流媒体进行推理
在演讲中(下一张幻灯片上):
你应该编写简单的查询,spark应该不断更新答案
有一种方法可以使用
StreamingQueryProgress
可以使用streamingquerylistener和onQueryProgress
回拨。onqueryprogress(event:queryprogressent):当有一些状态更新(摄取率更新等)时调用的单元
与
StreamingQueryProgress
您可以访问sources
提供所需内容的sourceprogress属性。iyzzxitl3#
具有kafka源的流式数据集
offset
作为这个领域的一员。您可以简单地查询查询中的所有偏移量,并将它们保存到jdbc sink中