使用批处理异步生产者时,kafka如何发送ack?ack是否为每消息/每批/每子批(即每分区一批)?是否建议在异步批处理prdocuer中使用ack?或者最好只是使用回调机制?
7qhs6swi1#
“当使用批处理异步生产者时,kafka如何发送ack?ack是每个消息/每个批/每个子批(即每个分区的批)?“ack是按批发送的(也就是说:按分区)。如果一个批中的一个或多个单独消息失败,则整个批被视为失败。根据您的重试配置,此批将被重新发送。“是否建议在异步批处理prdocuer中使用ack?或者最好只是使用回调机制?”我不认为这是一个非此即彼的问题。你可以有 acks>0 同时使用回调。这个 acks 设置通常允许您改进生产者的稳定性,而不受任何回调的影响。如果需要了解发送失败的全部细节,应该使用callback。我已经在我的另一篇文章中提供了关于生产者回调异常的更多细节。此外,您可以使用异步生产者的回调来处理手动提交,同时避免提交由于失效而导致的较低偏移量,如kafka consumer offset commit check中所述,以避免提交较小的偏移量
acks>0
acks
mrwjdhj32#
当使用批处理异步生产者时,kafka如何发送ack两种选择是更新se future,还是执行回调代码。java.util.concurrent.future发送(生产记录(记录)java.util.concurrent.future发送(生产记录(记录、回拨)是每个消息/每个批/每个子批的确认(即每个分区的批)问得好。我认为ack是针对整个请求的,所以请求的所有消息(但不确定,我没有找到信息)是否建议在异步批处理prdocuer中使用ack?或者最好只是使用回调机制?这是两个不同的概念,send始终与kafka异步,如果您不执行future.get(),如果您想批量发送许多记录,则必须在不阻塞的情况下发送(每次都不执行future.get())
2条答案
按热度按时间7qhs6swi1#
“当使用批处理异步生产者时,kafka如何发送ack?ack是每个消息/每个批/每个子批(即每个分区的批)?“
ack是按批发送的(也就是说:按分区)。如果一个批中的一个或多个单独消息失败,则整个批被视为失败。根据您的重试配置,此批将被重新发送。
“是否建议在异步批处理prdocuer中使用ack?或者最好只是使用回调机制?”
我不认为这是一个非此即彼的问题。你可以有
acks>0
同时使用回调。这个acks
设置通常允许您改进生产者的稳定性,而不受任何回调的影响。如果需要了解发送失败的全部细节,应该使用callback。我已经在我的另一篇文章中提供了关于生产者回调异常的更多细节。
此外,您可以使用异步生产者的回调来处理手动提交,同时避免提交由于失效而导致的较低偏移量,如kafka consumer offset commit check中所述,以避免提交较小的偏移量
mrwjdhj32#
当使用批处理异步生产者时,kafka如何发送ack
两种选择是更新se future,还是执行回调代码。
java.util.concurrent.future发送(生产记录(记录)
java.util.concurrent.future发送(生产记录(记录、回拨)
是每个消息/每个批/每个子批的确认(即每个分区的批)
问得好。我认为ack是针对整个请求的,所以请求的所有消息(但不确定,我没有找到信息)
是否建议在异步批处理prdocuer中使用ack?或者最好只是使用回调机制?
这是两个不同的概念,send始终与kafka异步,如果您不执行future.get(),如果您想批量发送许多记录,则必须在不阻塞的情况下发送(每次都不执行future.get())