我在应用程序中使用kafka 1.0.1,并且我已经开始使用0.11中引入的幂等生产者特性,并且在使用幂等特性时我很难理解排序保证。
我的生产者的配置是:
enable.idempotence = true max.in.flight.requests.per.connection = 5
retries = 50 acks = all
根据文件:
重试
设置一个大于零的值将导致客户机重新发送其发送失败的任何记录,并可能出现暂时性错误。请注意,此重试与客户端在收到错误时重新发送记录没有什么不同。允许重试而不将max.in.flight.requests.per.connection设置为1可能会更改记录的顺序,因为如果将两个批发送到单个分区,并且第一个失败并重试,但第二个成功,则第二个批中的记录可能会首先出现。
使能.幂等
当设置为“true”时,生产者将确保流中只写入每条消息的一个副本。如果“false”,则由于代理失败等原因导致的生产者重试可能会在流中写入重试消息的副本。请注意,启用幂等性要求max.in.flight.requests.per.connection小于或等于5,retries大于0,acks必须为“all”。如果用户未明确设置这些值,则将选择合适的值。如果设置了不兼容的值,将引发configexception。
我的配置似乎符合要求,但它们似乎不一致。
另一个问题与outofordersequenceexception有关:根据文档,如果我得到这个异常,就意味着生产商有可能变得无序。但如果我的制作人配置了 max.in.flight.requests.per.connection = 5
假设第二个请求得到了无序异常,那么下面所有已经在运行的请求会发生什么情况?这是否意味着我肯定出了问题?
1条答案
按热度按时间idv4meu81#
当在kafkaproducer中启用幂等时,序是有保证的。
即使你有
max.in.flight.requests.per.connection
大于或等于5的幂等kafkaproducer仍能保证主题分划中的有序性。关于“重试”的描述与max.in.flight
仅当幂等性被禁用时才适用。幂等的kafkaproducer使用一个内部递增的序列号来保证排序。
kafka-5494和有关启用幂等的max.in.flight>1设计的相应文档中给出了详细信息。步骤5和6将澄清您的问题:
根据上述假设,解决方案如下:
我们跟踪发送到分区的最后一个确认序列。每次成功确认时都会更新,因此应该一直增加。
我们跟踪给定分区的批处理绑定的下一个序列。
我们分配了下一个序列,当该批次被排空时。我们还将nextsequence增加一个批的记录计数。
如果生产请求成功,我们将最后一个确认序列设置为批处理的最后一个序列。
如果生产请求失败,航班批次中的成功也将失败,并出现outofordersequenceexception。
同样地,如果一个批的序列号不是最后一个确认序列的后续序列,并且如果它以outofordersequenceexception失败,我们认为这是可重试的。
当一个批被重新排队时,我们在将其插入队列之前删除生产者id和序列号。
当第一批飞行中失败时(无论什么原因),我们将下一个序列重置为lastackdsequence+1。
因此,如果一个批次致命地失败,那么在重试时,后续批次的序列号将不同。这很好,因为之前的失败是
OutOfSequence
异常,这绝对意味着请求被拒绝。“[…]如果我的producer配置为max.in.flight.requests.per.connection=5,并且假设第二个请求出现了无序异常,那么下面已经在运行的所有请求会发生什么情况?这是否意味着我肯定出了问题?”
如第5步和第6步所述,所有成功的飞行中请求也将失败,并进行重试
OutOfOrderSequenceExcpetion
. 作为retries
大于0
幂等的kafkaproducer将能够保持秩序。