使用offset恢复kafka中丢失的消息

xuo3flqw  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(483)

我在一次采访中被问到这个问题。
假设一个数据包由于故障而丢失(不确定其使用者故障或代理)。在使用偏移量恢复丢失的消息时应该做些什么(代码实现)?
我很抱歉,我的问题可能不清楚,因为它被问到类似的方式。
谢谢

1rhkuytd

1rhkuytd1#

如果您知道要恢复的消息的偏移量,以及它属于哪个分区,则可以使用 KafkaConsumer 方法 seek :

consumer.seek(new TopicPartition("topic-name", partNumber), offsetNumber);

如本文所述
下一个呼叫 poll() 会给你在名单上第一个错过的信息。
这只适用于您首先自己管理补偿的场景。如果你让Kafka管理偏移量,你可能不知道偏移量的数字,最好的结果可能是消息消耗了两次(一个调用 poll() 将从上次提交的偏移量开始消耗)。

t5zmwmid

t5zmwmid2#

在阅读了大量的文章和文档之后,我觉得最好的答案可能是:
使用没有接收器的新spark kafka消费者(spark-streaming-kafka-0-10_.11)。在这种方法中,我们可以从我们想要阅读的地方给出startoffset。
val offsetranges=array(//主题,分区,包含起始偏移量,独占结束偏移量offsetrange(“test”,0,0,100),
偏移范围(“测试”,1,0,100))
val rdd=kafkautils.createdirectstream[string,string](sparkcontext,kafkaparams,offsetranges,preferconsistent)
一旦您的消息被读取和处理,获取您读取的偏移量并将其存储在kafka或zk或外部事务数据库中。
offsetranges=rdd.asinstanceof[hasoffsetranges].offsetranges的示例
每次启动作业时,从数据库中获取偏移量并将其传递给createdirectstream,以使其具有exacly once机制。
更多阅读http://blog.cloudera.com/blog/2015/03/exactly-once-spark-streaming-from-apache-kafka/httpshttp://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

cwtwac6a

cwtwac6a3#

Kafka遵循 at-least once 消息传递语义,这意味着您可能在代理失败时得到重复的消息,您不会丢失数据。
但是当你创造 Kafka Producer 如果此属性为0,则它将只尝试发送一次,即使在代理失败的情况下也不会尝试重新发送。因此,如果代理失败,您可能会丢失数据。

props.put("retries", 0);

因此,您可以将此属性值更改为1,这样它也将尝试再次发送 offsets 在zookeeper中自动管理,如果消息仅成功传递,它将更新中的偏移量 Zookeeper .
另外,由于您提到了spark streaming来消费,spark streaming支持两种不同的方法。
1基于接收器:偏移在zookeeper中处理。
2直接方法:偏移量在存储消息的地方进行本地处理,而且这种方法只支持一次消息传递。
有关更多信息,请查看此链接

相关问题