我正在使用kafka0.8和spring集成kafka1.2.0.release
我有两个主题,分别是初级和次级。我需要从主主题消费,经过一些处理后,需要产生到第二个主题的下一套处理要做以后。
虽然从主要主题开始的消费工作正常,但从生成到次要主题的过程在几分钟后就开始失败。问题始于发送请求到Kafka超时后500毫秒,我已经设置。以线程池耗尽结束。
如果我试图将事件生成到另一个kafka集群的次要主题,那么它可以毫无问题地工作。
我有4个消费者同时运行这两个主题,每个主题有200个分区。
我对Kafka不太熟悉,请原谅我对Kafka缺乏了解。请评论我应该提供的任何遗漏信息。
2条答案
按热度按时间qcuzuvrc1#
在尝试了所有可能的配置之后,终于发现了问题。
错误地忘记删除下面的依赖项,这是先前为消费者集成添加的。
它在生成过程中导致了一些冲突,而该过程正在添加处于等待状态的线程。如果有人能对它可能增加的冲突提供指导,那将是一个很好的学习。
谢谢。
wfsdck302#
使用提供的信息有点难以了解,但我怀疑问题在于,您可以使用第一个主题,然后计算第一个主题的结果,比生成第二个主题的速度更快。这可能有很多原因。例如,对第二主题的写入可能在分区之间分布不均匀。类似地,生产到不同的集群可能会因为各种原因而成功,包括更快的机器、更多的机器、更好的网络等等。
基本问题并不是Kafka特有的:如果您从一个源消费数据并将数据发送到第二个接收器,您通常不能假设第二个接收器总是比第一个源快。每当第二个Flume变慢的时候,哪怕是一点点,你最终都会遇到这样的问题。例如,假设您可以每秒从主接收器读取100个事件,但次接收器每秒只能消耗99个事件。这意味着每秒钟内存中就会有一个事件等待发送到接收器。如果您不采取任何措施来降低从主源读取数据的速度,您将耗尽ram、线程或其他资源。
一般的解决方案是某种节流。例如,可以使用
Semaphore
这从500个许可证开始:这意味着你永远不能从主源读取超过500个你还没有成功发送到接收器的项目。在从主要源读取项目之前,您应该减小Semaphore
因此,如果你已经“领先”第二个500项,你的读者将阻止。每次您成功地将一个项目发送到您的第二个主题时,您都会释放一个允许进行另一个读取的许可证。我要提醒大家不要使用第二个kafka集群或者其他可以工作但不能真正解决核心问题的方法。例如,如果生成到另一个集群现在可以工作,那么当该集群由于节点丢失、大的重新平衡等原因而变慢时,它就不会工作了。这只是暂时隐藏了问题。