如何将spark消耗的最新偏移量保存到zk或kafka,并在重新启动后可以读回

sczxawaw  于 2021-06-08  发布在  Kafka
关注(0)|答案(4)|浏览(249)

我正在使用 Kafka 0.8.2 要从adexchange接收数据,则使用 Spark Streaming 1.4.1 将数据存储到 MongoDB .
我的问题是当我重新开始我的工作 Spark Streaming 比如更新新版本,修复bug,添加新功能。它将继续阅读最新的 offsetkafka 当时我将丢失数据adx推到Kafka期间重新启动作业。
我试着像 auto.offset.reset -> smallest 但是它会从0->上一次接收到大量的数据,并且在db中重复。
我也试着设定具体的 group.id 以及 consumer.idSpark 但它是一样的。
如何保存最新的 offset Spark消耗到 zookeeper 或者 kafka 然后你就可以读到最新的 offset ?

zzwlnbp8

zzwlnbp81#

为了补充michaelkopaniov的答案,如果您真的想使用zk作为存储和加载偏移Map的位置,您可以。
但是,由于您的结果没有被输出到zk,除非您的输出操作是幂等的(听起来好像不是),否则您将得不到可靠的语义。
如果可以将结果存储在mongo的同一个文档中,并将偏移量存储在单个原子操作中,那可能对您更好。
有关详细信息,请参见https://www.youtube.com/watch?v=fxnneq1v3va

vdgimpew

vdgimpew2#

下面是一些可以用来在zk中存储偏移量的代码http://geeks.aretotally.in/spark-streaming-kafka-direct-api-store-offsets-in-zk/
下面是一些代码,您可以在调用kafkautils.createdirectstream时使用偏移量:http://geeks.aretotally.in/spark-streaming-direct-api-reusing-offset-from-zookeeper/

esbemjvw

esbemjvw3#

我还没有完全弄清楚这一点,但最好的办法可能是设置javastreamingcontext.checkpoint()。
看到了吗https://spark.apache.org/docs/1.3.0/streaming-programming-guide.html#checkpointing 举个例子。
根据一些博客文章https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md 有一些警告,但它几乎感觉它涉及某些边缘案件,只是暗示,而不是实际解释。

am46iovg

am46iovg4#

createdirectstream函数的一个构造函数可以获得一个Map,该Map将以分区id作为键,以开始使用的偏移量作为值。
看看这里的api:http://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/streaming/kafka/kafkautils.html 我所说的Map通常叫做:fromOffset
可以将数据插入Map:

startOffsetsMap.put(TopicAndPartition(topicName,partitionId), startOffset)

并在创建直接流时使用它:

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](
                streamingContext, kafkaParams, startOffsetsMap, messageHandler(_))

每次迭代后,可以使用以下方法获得处理后的偏移:

rdd.asInstanceOf[HasOffsetRanges].offsetRanges

您将能够在下一次迭代中使用此数据来构造fromoffsetsMap。
您可以在这里看到完整的代码和用法:https://spark.apache.org/docs/latest/streaming-kafka-integration.html 在这一页的末尾

相关问题