当使用主题模式和大消息时,kafka消费者不会获取新记录

0yycz8jy  于 2021-07-26  发布在  Java
关注(0)|答案(1)|浏览(617)

我希望你们中有人能帮助我。
我正在使用SpringBoot2.3.4和SpringKafka2.5.6。我最近不得不重置一个偏移量,并看到一些奇怪的行为。我们消耗了消息,但是在每x个(可变)消息之后,在消耗继续之前,我们有10秒的超时。
这是我的配置:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      enable-auto-commit: false
      auto-offset-reset: earliest
      heartbeat-interval: 1000
      max-poll-records: 50
      group-id: kafka-fetch-demo
      fetch-max-wait: 10000
    listener:
      type: single
      concurrency: 1
      poll-timeout: 1000
      no-poll-threshold: 2
      monitor-interval: 10
      ack-mode: manual
    producer: 
      acks: all
      batch-size: 0
      retries: 0

这是一个examle侦听器代码:

@KafkaListener(id = LISTENER_ID, idIsGroup = false, topicPattern = "#{demoProperties.getTopicPattern()}")
  public void onEvent(Acknowledgment acknowledgment, ConsumerRecord<byte[], String> record) {
    log.info("Received record on topic {}, partition {} and offset {}",
            record.topic(),
            record.partition(),
            record.offset());

    acknowledgment.acknowledge();
  }

分析
我发现10秒的暂停时间来自 fetch.max.wait.ms 财产。但是我不明白为什么这个属性适用。
据我所知 fetch-max-wait 属性仅确定代理在向使用者提供新记录之前等待的最长时间,即使 fetch.min.bytes 不超过(在我的例子中,它被设置为默认值1,并且应该总是满的)此外,我分析了这个问题只适用于使用主题模式和“较大”消息的情况。
繁殖
我在github上上传了一个演示应用程序来重现这个问题:https://github.com/kraennix/kafka-fetch-demo.
我是如何复制的:
我在一个Kafka主题上放了1000条消息,每条消息17.1KB。
我启动我的消费应用程序,它按主题模式监听这个主题。然后你可以看到这种停止行为。
注意:如果我对“小”消息(89字节)执行相同的操作,它将按预期工作。
日志
在日志中,您可以看到成功的提交,但随后它会显示跳过获取

2021-01-16 15:04:40.773 DEBUG 19244 --- [_LISTENER-0-C-1] essageListenerContainer$ListenerConsumer : Commit list: {publish.LargeTopic.2.test-0=OffsetAndMetadata{offset=488, leaderEpoch=null, metadata=''}}
2021-01-16 15:04:40.773 DEBUG 19244 --- [_LISTENER-0-C-1] essageListenerContainer$ListenerConsumer : Committing: {publish.LargeTopic.2.test-0=OffsetAndMetadata{offset=488, leaderEpoch=null, metadata=''}}
2021-01-16 15:04:40.773 TRACE 19244 --- [_LISTENER-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-kafka-fetch-demo-1, groupId=kafka-fetch-demo] Sending OffsetCommit request with {publish.LargeTopic.2.test-0=OffsetAndMetadata{offset=488, leaderEpoch=null, metadata=''}} to coordinator localhost:9092 (id: 2147483647 rack: null)
2021-01-16 15:04:40.773 DEBUG 19244 --- [_LISTENER-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-kafka-fetch-demo-1, groupId=kafka-fetch-demo] Using older server API v7 to send OFFSET_COMMIT {group_id=kafka-fetch-demo,generation_id=4,member_id=consumer-kafka-fetch-demo-1-cf8e747f-531d-457a-aca8-18960c518ef9,group_instance_id=null,topics=[{name=publish.LargeTopic.2.test,partitions=[{partition_index=0,committed_offset=488,committed_leader_epoch=-1,committed_metadata=}]}]} with correlation id 62 to node 2147483647
2021-01-16 15:04:40.778 TRACE 19244 --- [_LISTENER-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-kafka-fetch-demo-1, groupId=kafka-fetch-demo] Completed receive from node 2147483647 for OFFSET_COMMIT with correlation id 62, received {throttle_time_ms=0,topics=[{name=publish.LargeTopic.2.test,partitions=[{partition_index=0,error_code=0}]}]}
2021-01-16 15:04:40.779 DEBUG 19244 --- [_LISTENER-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-kafka-fetch-demo-1, groupId=kafka-fetch-demo] Committed offset 488 for partition publish.LargeTopic.2.test-0
2021-01-16 15:04:40.779 TRACE 19244 --- [_LISTENER-0-C-1] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-kafka-fetch-demo-1, groupId=kafka-fetch-demo] Skipping fetch for partition publish.LargeTopic.1.test-0 because previous request to localhost:9092 (id: 0 rack: null) has not been processed
2021-01-16 15:04:40.779 TRACE 19244 --- [_LISTENER-0-C-1] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-kafka-fetch-demo-1, groupId=kafka-fetch-demo] Skipping fetch for partition publish.LargeTopic.2.test-0 because previous request to localhost:9092 (id: 0 rack: null) has not been processed
2021-01-16 15:04:40.779 TRACE 19244 --- [_LISTENER-0-C-1] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-kafka-fetch-demo-1, groupId=kafka-fetch-demo] Skipping fetch for partition publish.LargeTopic.1.test-0 because previous request to localhost:9092 (id: 0 rack: null) has not been processed
2021-01-16 15:04:40.779 TRACE 19244 --- [_LISTENER-0-C-1] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-kafka-fetch-demo-1, groupId=kafka-fetch-demo] Skipping fetch for partition publish.LargeTopic.2.test-0 because previous request to localhost:9092 (id: 0 rack: null) has not been processed
33qvvth1

33qvvth11#

当消息的大小发生变化时,您可能需要更改为低于2个道具心跳间隔:1000最大轮询记录数:50
你的心跳间隔是1秒,最大等待时间是10秒。如果消息的大小很大,并且您正在同一线程中处理消耗的消息,那么在触发下一次拉取时心跳检查将失败。确保执行器使用callable处理消息。
当消息大小较大时,将心跳间隔增加到5到10秒,并将最大轮询记录减少到15。希望这能帮上忙

相关问题