kafka使用者应用程序具有严重的延迟(在高峰时间消耗kafka事件的速度不够快)。kafka主题有120个分区,使用者组总共有30个主机,每个主机有两个使用者,因此每个使用者使用2个kafka分区。我们使用的主机是32核的awsc5.9xlarge示例。每个使用者被放入一个java.lang.thread中,在每个线程中,一个线程池由250个线程创建。
我们已经验证了cpu/memory/io都不是瓶颈。然后我们把250名工人增加到500名,但他们留下来了。然后我们又改回了250个工人,但是从每台主机2个消费者增加到了4个。因此,每个消费者从一个kafka分区进行消费。现在问题解决了,延迟降到很低。
我的问题是,为什么线程池中从250个增加到500个没有帮助,但是每个主机中从2个消费者增加到4个消费者却有帮助?
private class ConsumerThread extends Thread {
public ConsumerThread(StremProcessor processor) {
this.processor = processor;
this.consumer = new KafkaConsumer()
}
@Override
public void run() {
ExecutorService executor = Executors.newFixedThreadPool(250);
while (true) {
Data data = consumer.poll()
executor.invokeAll(getTasks(data, processor)); //processor is
}
}
}
2条答案
按热度按时间c86crjj01#
线程池只是一个
reusable
人才库java.lang.Thread
. 通常,线程池具有queue of tasks
如果线程池中的任何线程是空闲的,它就可以执行任务,任务完成后,该线程将返回到线程池,并尝试查找队列中是否还有其他任务在等待。threadpool中的线程与java.lang.thread中的线程有何不同?
没有区别。只是用法不同。
是因为线程池中的所有线程都使用单处理器内核吗?
不,它可以使用任何数量的可用处理器。
我记得executorpool中的默认线程是每个处理器250个,这是否意味着executorpool不够聪明,无法将250个线程分配到16核?
从那里你可以得到像“executorpool是每个处理器250个”这样的信息?。我不完全理解你的问题。线程池的线程可以像普通线程一样在任何内核上执行线程池的线程没有限制。
a0x5cqrl2#
首先:应该在每个循环之间的while循环中包含一些延迟,以防止应用程序溢出内存。
基本上
ExecutorService.invokeAll()
方法返回Future
s。你可以用它们来“控制”你的线程。threadpool中的线程与java.lang.thread中的线程有何不同?
它们没有区别,但你得到一个 Package (
Future
)它允许您在执行时控制线程。潜在的Thread
像普通的java线程一样工作。是因为线程池中的所有线程都使用单处理器内核吗?
不