我有一个kafka streams应用程序,它从几个主题中获取数据,并将数据合并到另一个主题中。
Kafka配置:
5 kafka brokers
Kafka Topics - 15 partitions and 3 replication factor.
注意:我在运行kafka代理的同一台机器上运行kafka流应用程序。
每小时消耗/产生几百万张唱片。每当我把任何Kafka经纪人,它进入重新平衡,它需要大约30分钟,有时甚至更多的重新平衡。
有人知道如何解决Kafka消费者的再平衡问题吗?而且,很多时候它在重新平衡时抛出异常。
这将阻止我们使用此设置在生产环境中运行。任何帮助都将不胜感激。
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: ?
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:725)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:604)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1173)
at org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:307)
at org.apache.kafka.streams.processor.internals.StreamTask.access$000(StreamTask.java:49)
at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:268)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
at org.apache.kafka.streams.processor.internals.StreamTask.commitImpl(StreamTask.java:259)
at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:362)
at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:346)
at org.apache.kafka.streams.processor.internals.StreamThread$3.apply(StreamThread.java:1118)
at org.apache.kafka.streams.processor.internals.StreamThread.performOnStreamTasks(StreamThread.java:1448)
at org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:1110)
Kafka流配置:
bootstrap.servers=kafka-1:9092,kafka-2:9092,kafka-3:9092,kafka-4:9092,kafka-5:9092
max.poll.records = 100
request.timeout.ms=40000
它内部创建的consumerconfig是:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [kafka-1:9092, kafka-2:9092, kafka-3:9092, kafka-4:9092, kafka-5:9092]
check.crcs = true
client.id = conversion-live-StreamThread-1-restore-consumer
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id =
heartbeat.interval.ms = 3000
interceptor.classes = null
internal.leave.group.on.close = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 2147483647
max.poll.records = 100
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 40000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
2条答案
按热度按时间ncgqoxb01#
根据我的经验,首先,考虑到您的工作负载,max.poll.records太小了:每小时消耗/生成数百万条记录。
因此,如果max.poll.records太小,比如说1,那么重新平衡需要很长时间。我不知道原因。
其次,确保流应用程序的输入主题的分区数是一致的。e、 如果app-1有两个输入主题a和b。如果a有4个分区,b有2个分区,那么重新平衡需要很长时间。但是,如果a和b都有4个分区,而有些分区是空闲的,那么重新平衡时间就很好了。希望有帮助
zdwk9cvp2#
我建议您配置
StandbyTasks
via参数num.standby.replicas=1
(默认为0
). 这将有助于显著减少再平衡时间。此外,我建议您将应用程序升级到kafka 0.11。注意,streams api 0.11向后兼容0.10.1和0.10.2代理,因此,不需要为此升级代理。再平衡行为在0.11中得到了很大的改进,并且在即将发布的1.0版本中将得到进一步的改进(参见。https://cwiki.apache.org/confluence/display/kafka/kip-167%3a+add+interface+for+the+state+store+restoration+process)因此,将应用程序升级到最新版本始终是重新平衡的改进。