我有一个关于Kafka自动提交机制的问题。我使用的是启用了自动提交的SpringKafka。作为一个实验,我在系统空闲时断开了消费者与kafka的连接30秒(主题中没有新消息,没有消息被处理)。重新连接后,我收到了如下消息:
Asynchronous auto-commit of offsets {cs-1915-2553221872080030-0=OffsetAndMetadata{offset=19, leaderEpoch=0, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
首先,我不明白要做什么?系统处于空闲状态(以前的所有消息都已提交)。第二,断开连接的时间是30秒,远小于5分钟(300000毫秒)max.poll.interval.ms第三,在Kafka的一次不受控制的故障中,我收到了至少30k条这种类型的消息,通过重新启动进程解决了这个问题。为什么会这样?
我在这里列出我的消费者配置:
allow.auto.create.topics = true
auto.commit.interval.ms = 100
auto.offset.reset = latest
bootstrap.servers = [kafka1-eu.dev.com:9094, kafka2-eu.dev.com:9094, kafka3-eu.dev.com:9094]
check.crcs = true
client.dns.lookup = default
client.id =
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = feature-cs-1915-2553221872080030
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = SSL
send.buffer.bytes = 131072
session.timeout.ms = 15000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = [hidden]
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = /home/me/feature-2553221872080030.keystore
ssl.keystore.password = [hidden]
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = /home/me/feature-2553221872080030.truststore
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS
value.deserializer = class org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
1条答案
按热度按时间kb5ga3dv1#
首先,我不明白要做什么?
你说得对,如果没有新的数据流动,就没有什么新的东西需要提交。但是,启用auto.commit并且您的使用者仍在运行(即使无法连接到代理),poll方法仍负责以下步骤:
从分配的分区获取消息
触发器分区分配(如果需要)
如果启用了自动偏移提交,则提交偏移量
加上100毫秒的间隔(参见
auto.commit.intervals
)使用者仍然尝试异步提交使用者的(非更改的)偏移位置。其次,断开连接的时间是30秒,远小于5分钟(300000毫秒)max.poll.interval.ms
导致重新平衡的不是max.poll.interval,而是
heartbeat.interval.ms
设置和session.timeout.ms
. 您的使用者根据间隔设置发送后台线程心跳,在您的情况下为3秒。如果代理在此会话超时过期之前(在您的情况下为15秒)未收到任何心跳信号,则代理将从组中删除此客户端并启动重新平衡。关于消费者配置的kafka文档中给出了我提到的配置的更详细的描述
第三,在Kafka的一次不受控制的失败中,我收到了至少3万条这种类型的消息,通过重新启动进程解决了这个问题。为什么会这样?
这似乎是前两个问题的结合,心跳无法发送,消费者仍然试图通过连续调用的poll方法提交。
正如@garyrussell在他的评论中提到的,我会小心使用
auto.commit.enabled
而是自己控制抵消管理。