我希望你们中有人能帮助我。
我正在使用SpringBoot2.3.4和SpringKafka2.5.6。我最近不得不重置一个偏移量,并看到一些奇怪的行为。我们消耗了消息,但是在每x个(可变)消息之后,在消耗继续之前,我们有10秒的超时。
这是我的配置:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
enable-auto-commit: false
auto-offset-reset: earliest
heartbeat-interval: 1000
max-poll-records: 50
group-id: kafka-fetch-demo
fetch-max-wait: 10000
listener:
type: single
concurrency: 1
poll-timeout: 1000
no-poll-threshold: 2
monitor-interval: 10
ack-mode: manual
producer:
acks: all
batch-size: 0
retries: 0
这是一个examle侦听器代码:
@KafkaListener(id = LISTENER_ID, idIsGroup = false, topicPattern = "#{demoProperties.getTopicPattern()}")
public void onEvent(Acknowledgment acknowledgment, ConsumerRecord<byte[], String> record) {
log.info("Received record on topic {}, partition {} and offset {}",
record.topic(),
record.partition(),
record.offset());
acknowledgment.acknowledge();
}
分析
我发现10秒的暂停时间来自 fetch.max.wait.ms
财产。但是我不明白为什么这个属性适用。
据我所知 fetch-max-wait
属性仅确定代理在向使用者提供新记录之前等待的最长时间,即使 fetch.min.bytes
不超过(在我的例子中,它被设置为默认值1,并且应该总是满的)此外,我分析了这个问题只适用于使用主题模式和“较大”消息的情况。
繁殖
我在github上上传了一个演示应用程序来重现这个问题:https://github.com/kraennix/kafka-fetch-demo.
我是如何复制的:
我在一个Kafka主题上放了1000条消息,每条消息17.1KB。
我启动我的消费应用程序,它按主题模式监听这个主题。然后你可以看到这种停止行为。
注意:如果我对“小”消息(89字节)执行相同的操作,它将按预期工作。
日志
在日志中,您可以看到成功的提交,但随后它会显示跳过获取
2021-01-16 15:04:40.773 DEBUG 19244 --- [_LISTENER-0-C-1] essageListenerContainer$ListenerConsumer : Commit list: {publish.LargeTopic.2.test-0=OffsetAndMetadata{offset=488, leaderEpoch=null, metadata=''}}
2021-01-16 15:04:40.773 DEBUG 19244 --- [_LISTENER-0-C-1] essageListenerContainer$ListenerConsumer : Committing: {publish.LargeTopic.2.test-0=OffsetAndMetadata{offset=488, leaderEpoch=null, metadata=''}}
2021-01-16 15:04:40.773 TRACE 19244 --- [_LISTENER-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-kafka-fetch-demo-1, groupId=kafka-fetch-demo] Sending OffsetCommit request with {publish.LargeTopic.2.test-0=OffsetAndMetadata{offset=488, leaderEpoch=null, metadata=''}} to coordinator localhost:9092 (id: 2147483647 rack: null)
2021-01-16 15:04:40.773 DEBUG 19244 --- [_LISTENER-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-kafka-fetch-demo-1, groupId=kafka-fetch-demo] Using older server API v7 to send OFFSET_COMMIT {group_id=kafka-fetch-demo,generation_id=4,member_id=consumer-kafka-fetch-demo-1-cf8e747f-531d-457a-aca8-18960c518ef9,group_instance_id=null,topics=[{name=publish.LargeTopic.2.test,partitions=[{partition_index=0,committed_offset=488,committed_leader_epoch=-1,committed_metadata=}]}]} with correlation id 62 to node 2147483647
2021-01-16 15:04:40.778 TRACE 19244 --- [_LISTENER-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-kafka-fetch-demo-1, groupId=kafka-fetch-demo] Completed receive from node 2147483647 for OFFSET_COMMIT with correlation id 62, received {throttle_time_ms=0,topics=[{name=publish.LargeTopic.2.test,partitions=[{partition_index=0,error_code=0}]}]}
2021-01-16 15:04:40.779 DEBUG 19244 --- [_LISTENER-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-kafka-fetch-demo-1, groupId=kafka-fetch-demo] Committed offset 488 for partition publish.LargeTopic.2.test-0
2021-01-16 15:04:40.779 TRACE 19244 --- [_LISTENER-0-C-1] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-kafka-fetch-demo-1, groupId=kafka-fetch-demo] Skipping fetch for partition publish.LargeTopic.1.test-0 because previous request to localhost:9092 (id: 0 rack: null) has not been processed
2021-01-16 15:04:40.779 TRACE 19244 --- [_LISTENER-0-C-1] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-kafka-fetch-demo-1, groupId=kafka-fetch-demo] Skipping fetch for partition publish.LargeTopic.2.test-0 because previous request to localhost:9092 (id: 0 rack: null) has not been processed
2021-01-16 15:04:40.779 TRACE 19244 --- [_LISTENER-0-C-1] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-kafka-fetch-demo-1, groupId=kafka-fetch-demo] Skipping fetch for partition publish.LargeTopic.1.test-0 because previous request to localhost:9092 (id: 0 rack: null) has not been processed
2021-01-16 15:04:40.779 TRACE 19244 --- [_LISTENER-0-C-1] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-kafka-fetch-demo-1, groupId=kafka-fetch-demo] Skipping fetch for partition publish.LargeTopic.2.test-0 because previous request to localhost:9092 (id: 0 rack: null) has not been processed
1条答案
按热度按时间33qvvth11#
当消息的大小发生变化时,您可能需要更改为低于2个道具心跳间隔:1000最大轮询记录数:50
你的心跳间隔是1秒,最大等待时间是10秒。如果消息的大小很大,并且您正在同一线程中处理消耗的消息,那么在触发下一次拉取时心跳检查将失败。确保执行器使用callable处理消息。
当消息大小较大时,将心跳间隔增加到5到10秒,并将最大轮询记录减少到15。希望这能帮上忙