偏移量的异步自动提交失败

ffscu2ro  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(449)

我有一个关于Kafka自动提交机制的问题。我使用的是启用了自动提交的SpringKafka。作为一个实验,我在系统空闲时断开了消费者与kafka的连接30秒(主题中没有新消息,没有消息被处理)。重新连接后,我收到了如下消息:

Asynchronous auto-commit of offsets {cs-1915-2553221872080030-0=OffsetAndMetadata{offset=19, leaderEpoch=0, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

首先,我不明白要做什么?系统处于空闲状态(以前的所有消息都已提交)。第二,断开连接的时间是30秒,远小于5分钟(300000毫秒)max.poll.interval.ms第三,在Kafka的一次不受控制的故障中,我收到了至少30k条这种类型的消息,通过重新启动进程解决了这个问题。为什么会这样?
我在这里列出我的消费者配置:

allow.auto.create.topics = true
        auto.commit.interval.ms = 100
        auto.offset.reset = latest
        bootstrap.servers = [kafka1-eu.dev.com:9094, kafka2-eu.dev.com:9094, kafka3-eu.dev.com:9094]
        check.crcs = true
        client.dns.lookup = default
        client.id =
        client.rack =
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = true
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = feature-cs-1915-2553221872080030
        group.instance.id = null
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        isolation.level = read_uncommitted
        key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = SSL
        send.buffer.bytes = 131072
        session.timeout.ms = 15000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = [hidden]
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = /home/me/feature-2553221872080030.keystore
        ssl.keystore.password = [hidden]
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = /home/me/feature-2553221872080030.truststore
        ssl.truststore.password = [hidden]
        ssl.truststore.type = JKS
        value.deserializer = class org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
kb5ga3dv

kb5ga3dv1#

首先,我不明白要做什么?
你说得对,如果没有新的数据流动,就没有什么新的东西需要提交。但是,启用auto.commit并且您的使用者仍在运行(即使无法连接到代理),poll方法仍负责以下步骤:
从分配的分区获取消息
触发器分区分配(如果需要)
如果启用了自动偏移提交,则提交偏移量
加上100毫秒的间隔(参见 auto.commit.intervals )使用者仍然尝试异步提交使用者的(非更改的)偏移位置。
其次,断开连接的时间是30秒,远小于5分钟(300000毫秒)max.poll.interval.ms
导致重新平衡的不是max.poll.interval,而是 heartbeat.interval.ms 设置和 session.timeout.ms . 您的使用者根据间隔设置发送后台线程心跳,在您的情况下为3秒。如果代理在此会话超时过期之前(在您的情况下为15秒)未收到任何心跳信号,则代理将从组中删除此客户端并启动重新平衡。
关于消费者配置的kafka文档中给出了我提到的配置的更详细的描述
第三,在Kafka的一次不受控制的失败中,我收到了至少3万条这种类型的消息,通过重新启动进程解决了这个问题。为什么会这样?
这似乎是前两个问题的结合,心跳无法发送,消费者仍然试图通过连续调用的poll方法提交。
正如@garyrussell在他的评论中提到的,我会小心使用 auto.commit.enabled 而是自己控制抵消管理。

相关问题