我正在使用 Kafka 0.8.2
要从adexchange接收数据,则使用 Spark Streaming 1.4.1
将数据存储到 MongoDB
.
我的问题是当我重新开始我的工作 Spark Streaming
比如更新新版本,修复bug,添加新功能。它将继续阅读最新的 offset
的 kafka
当时我将丢失数据adx推到Kafka期间重新启动作业。
我试着像 auto.offset.reset -> smallest
但是它会从0->上一次接收到大量的数据,并且在db中重复。
我也试着设定具体的 group.id
以及 consumer.id
至 Spark
但它是一样的。
如何保存最新的 offset
Spark消耗到 zookeeper
或者 kafka
然后你就可以读到最新的 offset
?
4条答案
按热度按时间zzwlnbp81#
为了补充michaelkopaniov的答案,如果您真的想使用zk作为存储和加载偏移Map的位置,您可以。
但是,由于您的结果没有被输出到zk,除非您的输出操作是幂等的(听起来好像不是),否则您将得不到可靠的语义。
如果可以将结果存储在mongo的同一个文档中,并将偏移量存储在单个原子操作中,那可能对您更好。
有关详细信息,请参见https://www.youtube.com/watch?v=fxnneq1v3va
vdgimpew2#
下面是一些可以用来在zk中存储偏移量的代码http://geeks.aretotally.in/spark-streaming-kafka-direct-api-store-offsets-in-zk/
下面是一些代码,您可以在调用kafkautils.createdirectstream时使用偏移量:http://geeks.aretotally.in/spark-streaming-direct-api-reusing-offset-from-zookeeper/
esbemjvw3#
我还没有完全弄清楚这一点,但最好的办法可能是设置javastreamingcontext.checkpoint()。
看到了吗https://spark.apache.org/docs/1.3.0/streaming-programming-guide.html#checkpointing 举个例子。
根据一些博客文章https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md 有一些警告,但它几乎感觉它涉及某些边缘案件,只是暗示,而不是实际解释。
am46iovg4#
createdirectstream函数的一个构造函数可以获得一个Map,该Map将以分区id作为键,以开始使用的偏移量作为值。
看看这里的api:http://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/streaming/kafka/kafkautils.html 我所说的Map通常叫做:fromOffset
可以将数据插入Map:
并在创建直接流时使用它:
每次迭代后,可以使用以下方法获得处理后的偏移:
您将能够在下一次迭代中使用此数据来构造fromoffsetsMap。
您可以在这里看到完整的代码和用法:https://spark.apache.org/docs/latest/streaming-kafka-integration.html 在这一页的末尾