我在一次采访中被问到这个问题。假设一个数据包由于故障而丢失(不确定其使用者故障或代理)。在使用偏移量恢复丢失的消息时应该做些什么(代码实现)?我很抱歉,我的问题可能不清楚,因为它被问到类似的方式。谢谢
1rhkuytd1#
如果您知道要恢复的消息的偏移量,以及它属于哪个分区,则可以使用 KafkaConsumer 方法 seek :
KafkaConsumer
seek
consumer.seek(new TopicPartition("topic-name", partNumber), offsetNumber);
如本文所述下一个呼叫 poll() 会给你在名单上第一个错过的信息。这只适用于您首先自己管理补偿的场景。如果你让Kafka管理偏移量,你可能不知道偏移量的数字,最好的结果可能是消息消耗了两次(一个调用 poll() 将从上次提交的偏移量开始消耗)。
poll()
t5zmwmid2#
在阅读了大量的文章和文档之后,我觉得最好的答案可能是:使用没有接收器的新spark kafka消费者(spark-streaming-kafka-0-10_.11)。在这种方法中,我们可以从我们想要阅读的地方给出startoffset。val offsetranges=array(//主题,分区,包含起始偏移量,独占结束偏移量offsetrange(“test”,0,0,100),偏移范围(“测试”,1,0,100))val rdd=kafkautils.createdirectstream[string,string](sparkcontext,kafkaparams,offsetranges,preferconsistent)一旦您的消息被读取和处理,获取您读取的偏移量并将其存储在kafka或zk或外部事务数据库中。offsetranges=rdd.asinstanceof[hasoffsetranges].offsetranges的示例每次启动作业时,从数据库中获取偏移量并将其传递给createdirectstream,以使其具有exacly once机制。更多阅读http://blog.cloudera.com/blog/2015/03/exactly-once-spark-streaming-from-apache-kafka/httpshttp://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
cwtwac6a3#
Kafka遵循 at-least once 消息传递语义,这意味着您可能在代理失败时得到重复的消息,您不会丢失数据。但是当你创造 Kafka Producer 如果此属性为0,则它将只尝试发送一次,即使在代理失败的情况下也不会尝试重新发送。因此,如果代理失败,您可能会丢失数据。
at-least once
Kafka Producer
props.put("retries", 0);
因此,您可以将此属性值更改为1,这样它也将尝试再次发送 offsets 在zookeeper中自动管理,如果消息仅成功传递,它将更新中的偏移量 Zookeeper .另外,由于您提到了spark streaming来消费,spark streaming支持两种不同的方法。1基于接收器:偏移在zookeeper中处理。2直接方法:偏移量在存储消息的地方进行本地处理,而且这种方法只支持一次消息传递。有关更多信息,请查看此链接
offsets
Zookeeper
3条答案
按热度按时间1rhkuytd1#
如果您知道要恢复的消息的偏移量,以及它属于哪个分区,则可以使用
KafkaConsumer
方法seek
:如本文所述
下一个呼叫
poll()
会给你在名单上第一个错过的信息。这只适用于您首先自己管理补偿的场景。如果你让Kafka管理偏移量,你可能不知道偏移量的数字,最好的结果可能是消息消耗了两次(一个调用
poll()
将从上次提交的偏移量开始消耗)。t5zmwmid2#
在阅读了大量的文章和文档之后,我觉得最好的答案可能是:
使用没有接收器的新spark kafka消费者(spark-streaming-kafka-0-10_.11)。在这种方法中,我们可以从我们想要阅读的地方给出startoffset。
val offsetranges=array(//主题,分区,包含起始偏移量,独占结束偏移量offsetrange(“test”,0,0,100),
偏移范围(“测试”,1,0,100))
val rdd=kafkautils.createdirectstream[string,string](sparkcontext,kafkaparams,offsetranges,preferconsistent)
一旦您的消息被读取和处理,获取您读取的偏移量并将其存储在kafka或zk或外部事务数据库中。
offsetranges=rdd.asinstanceof[hasoffsetranges].offsetranges的示例
每次启动作业时,从数据库中获取偏移量并将其传递给createdirectstream,以使其具有exacly once机制。
更多阅读http://blog.cloudera.com/blog/2015/03/exactly-once-spark-streaming-from-apache-kafka/httpshttp://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
cwtwac6a3#
Kafka遵循
at-least once
消息传递语义,这意味着您可能在代理失败时得到重复的消息,您不会丢失数据。但是当你创造
Kafka Producer
如果此属性为0,则它将只尝试发送一次,即使在代理失败的情况下也不会尝试重新发送。因此,如果代理失败,您可能会丢失数据。因此,您可以将此属性值更改为1,这样它也将尝试再次发送
offsets
在zookeeper中自动管理,如果消息仅成功传递,它将更新中的偏移量Zookeeper
.另外,由于您提到了spark streaming来消费,spark streaming支持两种不同的方法。
1基于接收器:偏移在zookeeper中处理。
2直接方法:偏移量在存储消息的地方进行本地处理,而且这种方法只支持一次消息传递。
有关更多信息,请查看此链接