Kafka消费者再平衡需要太长时间

qybjjes1  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(990)

我有一个kafka streams应用程序,它从几个主题中获取数据,并将数据合并到另一个主题中。
Kafka配置:

  1. 5 kafka brokers
  2. Kafka Topics - 15 partitions and 3 replication factor.

注意:我在运行kafka代理的同一台机器上运行kafka流应用程序。
每小时消耗/产生几百万张唱片。每当我把任何Kafka经纪人,它进入重新平衡,它需要大约30分钟,有时甚至更多的重新平衡。
有人知道如何解决Kafka消费者的再平衡问题吗?而且,很多时候它在重新平衡时抛出异常。
这将阻止我们使用此设置在生产环境中运行。任何帮助都将不胜感激。

  1. Caused by: org.apache.kafka.clients.consumer.CommitFailedException: ?
  2. Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
  3. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:725)
  4. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:604)
  5. at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1173)
  6. at org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:307)
  7. at org.apache.kafka.streams.processor.internals.StreamTask.access$000(StreamTask.java:49)
  8. at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:268)
  9. at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
  10. at org.apache.kafka.streams.processor.internals.StreamTask.commitImpl(StreamTask.java:259)
  11. at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:362)
  12. at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:346)
  13. at org.apache.kafka.streams.processor.internals.StreamThread$3.apply(StreamThread.java:1118)
  14. at org.apache.kafka.streams.processor.internals.StreamThread.performOnStreamTasks(StreamThread.java:1448)
  15. at org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:1110)

Kafka流配置:

  1. bootstrap.servers=kafka-1:9092,kafka-2:9092,kafka-3:9092,kafka-4:9092,kafka-5:9092
  2. max.poll.records = 100
  3. request.timeout.ms=40000

它内部创建的consumerconfig是:

  1. auto.commit.interval.ms = 5000
  2. auto.offset.reset = earliest
  3. bootstrap.servers = [kafka-1:9092, kafka-2:9092, kafka-3:9092, kafka-4:9092, kafka-5:9092]
  4. check.crcs = true
  5. client.id = conversion-live-StreamThread-1-restore-consumer
  6. connections.max.idle.ms = 540000
  7. enable.auto.commit = false
  8. exclude.internal.topics = true
  9. fetch.max.bytes = 52428800
  10. fetch.max.wait.ms = 500
  11. fetch.min.bytes = 1
  12. group.id =
  13. heartbeat.interval.ms = 3000
  14. interceptor.classes = null
  15. internal.leave.group.on.close = false
  16. isolation.level = read_uncommitted
  17. key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
  18. max.partition.fetch.bytes = 1048576
  19. max.poll.interval.ms = 2147483647
  20. max.poll.records = 100
  21. metadata.max.age.ms = 300000
  22. metric.reporters = []
  23. metrics.num.samples = 2
  24. metrics.recording.level = INFO
  25. metrics.sample.window.ms = 30000
  26. partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
  27. receive.buffer.bytes = 65536
  28. reconnect.backoff.max.ms = 1000
  29. reconnect.backoff.ms = 50
  30. request.timeout.ms = 40000
  31. retry.backoff.ms = 100
  32. sasl.jaas.config = null
  33. sasl.kerberos.kinit.cmd = /usr/bin/kinit
  34. sasl.kerberos.min.time.before.relogin = 60000
  35. sasl.kerberos.service.name = null
  36. sasl.kerberos.ticket.renew.jitter = 0.05
  37. sasl.kerberos.ticket.renew.window.factor = 0.8
  38. sasl.mechanism = GSSAPI
  39. security.protocol = PLAINTEXT
  40. send.buffer.bytes = 131072
  41. session.timeout.ms = 10000
  42. ssl.cipher.suites = null
  43. ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
  44. ssl.endpoint.identification.algorithm = null
  45. ssl.key.password = null
  46. ssl.keymanager.algorithm = SunX509
  47. ssl.keystore.location = null
  48. ssl.keystore.password = null
  49. ssl.keystore.type = JKS
  50. ssl.protocol = TLS
  51. ssl.provider = null
  52. ssl.secure.random.implementation = null
  53. ssl.trustmanager.algorithm = PKIX
  54. ssl.truststore.location = null
  55. ssl.truststore.password = null
  56. ssl.truststore.type = JKS
  57. value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
ncgqoxb0

ncgqoxb01#

根据我的经验,首先,考虑到您的工作负载,max.poll.records太小了:每小时消耗/生成数百万条记录。
因此,如果max.poll.records太小,比如说1,那么重新平衡需要很长时间。我不知道原因。
其次,确保流应用程序的输入主题的分区数是一致的。e、 如果app-1有两个输入主题a和b。如果a有4个分区,b有2个分区,那么重新平衡需要很长时间。但是,如果a和b都有4个分区,而有些分区是空闲的,那么重新平衡时间就很好了。希望有帮助

zdwk9cvp

zdwk9cvp2#

我建议您配置 StandbyTasks via参数 num.standby.replicas=1 (默认为 0 ). 这将有助于显著减少再平衡时间。
此外,我建议您将应用程序升级到kafka 0.11。注意,streams api 0.11向后兼容0.10.1和0.10.2代理,因此,不需要为此升级代理。再平衡行为在0.11中得到了很大的改进,并且在即将发布的1.0版本中将得到进一步的改进(参见。https://cwiki.apache.org/confluence/display/kafka/kip-167%3a+add+interface+for+the+state+store+restoration+process)因此,将应用程序升级到最新版本始终是重新平衡的改进。

相关问题