场景/用例:我有一个spring引导应用程序,使用spring for kafka向kafka主题发送消息。在完成特定事件(由http请求触发)后,将创建一个新线程(通过spring@async),该线程调用kafkatemplate.send(),并对其返回的listenablefuture进行回调。处理http请求的原始线程将向调用客户端返回一个响应并释放。
正常行为:在正常的应用程序加载下,我已经验证了各个消息是否都按需要发布到主题(回调成功或失败时的应用程序日志条目以及在kafka集群上查看主题中的消息)。如果我关闭所有kafka代理3-5分钟,然后使集群重新联机,应用程序的发布者将立即重新建立与kafka的连接并继续发布消息。
问题行为:但是,在执行负载测试时,如果我关闭所有kafka代理3-5分钟,然后使集群重新联机,那么spring应用程序的发布服务器将继续显示所有发布尝试的失败。这种情况持续了大约7个小时,此时发布者能够再次成功地与Kafka重新建立通信(通常会出现管道中断异常,但并不总是如此)。
当前发现:在执行负载测试的过程中,我使用jconsole连接到应用程序,并通过kafka.producer监视producer度量。在重载的前约30秒内,缓冲区可用字节继续减少,直到达到0并保持在0。等待线程保持在6到10之间(每次点击刷新时交替),缓冲区可用字节保持在0大约6.5小时。在此之后,缓冲区可用字节将显示恢复的所有最初分配的内存,但kafka发布尝试继续失败约30分钟,然后最终恢复所需的行为。
当前生产者配置
request.timeout.ms=3000
max.retry.count=2
max.inflight.requests=1
max.block.ms=10000
retry.backoff.ms=3000
所有其他属性都使用其默认值
问题:
考虑到我的用例,改变batch.size或linger.ms是否会对消除重载时遇到的问题产生积极影响?
假设我有单独的线程,所有调用kafkatemplate.send()的线程都有单独的消息和回调,并且我的max.in.flight.requests.per.connection设置为1,那么batch.size和linger.ms是否会被忽略,而不是限制每条消息的大小?我的理解是,在这个场景中实际上没有批处理,每个消息都作为单独的请求发送。
既然我把max.block.ms设置为10秒,为什么缓冲区内存会被占用这么长时间,为什么所有消息都会在这么长时间内无法发布。我的理解是,在10秒之后,每次新的发布尝试都应该失败,并返回失败回调,从而释放关联的线程
更新:尝试澄清线程用法。我使用的是javadocs中推荐的单生产者示例。有诸如https-jsse-nio-22443-exec-*之类的线程正在处理传入的https请求。当一个请求进入某个处理时,一旦所有与kafka无关的逻辑完成,就会调用另一个用@async修饰的类中的方法。此方法调用kafkatemplate.send()。在执行发布到kafka之前,返回到客户机的响应会显示在日志中(这就是im如何通过单独的线程验证它的执行,因为服务在返回响应之前不会等待发布)。有任务调度器-*线程似乎正在处理来自kafkatemplate.send()的回调。我的猜测是,Kafka生产者网络线程处理所有的出版。
1条答案
按热度按时间mbskvtky1#
我的应用程序发出一个http请求,并在每次kafka发布失败时将每条消息发送到数据库平台上的死信表。为执行对kafka的发布而启动的线程也被重新用于对数据库的调用。我将数据库调用逻辑移到另一个类中,并用它自己的@async和自定义taskexecutor修饰它。完成此操作后,我监视了jconsole,可以看到对kafka的调用似乎在重新使用相同的10个线程
(TaskExecutor: core Pool size - 10, QueueCapacity - 0, and MaxPoolSize - 80)
对数据库服务的调用现在使用一个单独的线程池(TaskExecutor: core Pool size - 10, QueueCapacity - 0, and MaxPoolSize - 80)
始终关闭和打开新线程,但保持相对恒定的线程数。有了这个新的行为缓冲区,可用字节保持在一个健康的常量级别,一旦代理恢复在线,应用程序的kafka发布服务器就成功地重新建立了连接。