我想问一下,在使用@ Kafkafka进行长时间操作时,有什么方法可以防止消费者被撤销?
我有一个应用程序,其中有需要大量时间处理的作业(30-60分钟),目前max.poll.interval.ms被设置为90分钟。当应用程序的新示例出现时,会重新平衡,如果有实际处理作业,来自其他分区的消息会被挂起(因为分区/组被撤销)。即使主题正在重新平衡,是否有任何方法仍然可以处理消费者的新消息?
我想问一下,在使用@ Kafkafka进行长时间操作时,有什么方法可以防止消费者被撤销?
我有一个应用程序,其中有需要大量时间处理的作业(30-60分钟),目前max.poll.interval.ms被设置为90分钟。当应用程序的新示例出现时,会重新平衡,如果有实际处理作业,来自其他分区的消息会被挂起(因为分区/组被撤销)。即使主题正在重新平衡,是否有任何方法仍然可以处理消费者的新消息?
1条答案
按热度按时间nnsrf1az1#
Kafka并不是为这样的场景而设计的。
但是,您可以将
max.poll.records
设置为1
,并在接收记录时将pause()
设置为容器。将处理交给另一个线程并退出侦听器;容器将在处理时继续轮询代理,但不返回任何记录。处理完成后,确认记录(使用
AckMode.MANUAL
),以提交其偏移量,并resume()
容器。如果在处理过程中发生重新平衡,容器将重新暂停消费者。
如果你想在处理之前提交偏移量(冒着记录丢失的风险),你不需要手动确认。