使用发布/订阅如何确认消息?当消息发送给唯一组中的某些消费者时,如果所有消费者都确认,则表示消息已被确认,还是消息已被所有消费者确认?
k5ifujac1#
在Producer端,您可以选择等待代理确认消息已成功存储在主题中。您可以使用名为acks的Producer配置并将其设置为值1或all:
1
all
acks=1:这意味着leader会将记录写入其本地日志,但不会等待所有followers的完全确认。在这种情况下,如果leader在确认记录后立即失败,但在followers复制之前,则记录将丢失。acks=all:这意味着leader将等待同步副本的全部集合来确认记录。这保证了只要至少一个同步副本保持活动,记录就不会丢失。这是最强的可用保证。这相当于acks=-1设置。
在Consumer端,有一个Consumer Group的概念,它向Broker提交偏移量,确认Consumer已经处理了哪些消息。每个Consumer Group将提交自己的偏移量,确认它已经消费了这些消息。这独立于其他Consumer Group。如果您有两个消费者组消费同一主题,每个消费者组将根据其各自的配置独立提交其偏移量。默认情况下,Consumer会通过Consumer配置enable.auto.commit和auto.commit.interval.ms(默认为5秒)自动将偏移量提交回Kafka。或者您可以根据您在Consumer中的处理逻辑手动提交偏移量。
enable.auto.commit
auto.commit.interval.ms
1条答案
按热度按时间k5ifujac1#
制作人
在Producer端,您可以选择等待代理确认消息已成功存储在主题中。您可以使用名为acks的Producer配置并将其设置为值
1
或all
:acks=1:这意味着leader会将记录写入其本地日志,但不会等待所有followers的完全确认。在这种情况下,如果leader在确认记录后立即失败,但在followers复制之前,则记录将丢失。
acks=all:这意味着leader将等待同步副本的全部集合来确认记录。这保证了只要至少一个同步副本保持活动,记录就不会丢失。这是最强的可用保证。这相当于acks=-1设置。
消费者
在Consumer端,有一个Consumer Group的概念,它向Broker提交偏移量,确认Consumer已经处理了哪些消息。每个Consumer Group将提交自己的偏移量,确认它已经消费了这些消息。这独立于其他Consumer Group。
如果您有两个消费者组消费同一主题,每个消费者组将根据其各自的配置独立提交其偏移量。
默认情况下,Consumer会通过Consumer配置
enable.auto.commit
和auto.commit.interval.ms
(默认为5秒)自动将偏移量提交回Kafka。或者您可以根据您在Consumer中的处理逻辑手动提交偏移量。