我正在做一个研究项目,在那里我在google云平台上安装了一个完整的数据分析管道。我们使用spark上的hyperloglog实时估计每个url的唯一访问者。我用dataproc建立了spark集群。这项工作的一个目标是根据集群大小来度量体系结构的吞吐量。spark群集有三个节点(最小配置)
数据流是用自己的数据生成器模拟的,这些生成器是用java编写的,我在java中使用了kafka producerapi。架构如下所示:
数据生成器->Kafka->spark流->elasticsearch。
问题是:当我在我的数据生成器上增加每秒产生的事件数并且超过1000个事件/秒时,spark作业中的输入速率突然崩溃,并且开始有很大的变化。
正如您在SparkWebUI的屏幕截图上看到的,处理时间和调度延迟保持不变,而输入速率下降。
spark web ui截图
我用一个完全简单的spark作业进行了测试,它只做了一个简单的Map,以排除诸如elasticsearch写得慢或作业本身存在问题等原因。Kafka似乎也能正确地接收和发送所有事件。
此外,我还试验了spark配置参数: spark.streaming.kafka.maxRatePerPartition
以及 spark.streaming.receiver.maxRate
同样的结果。
有人知道这里出了什么问题吗?这似乎真的取决于spark作业或dataproc。。。但我不确定。所有的cpu和内存利用率似乎都没问题。
编辑:目前我有两个Kafka分区的主题(放在一台机器上)。但我认为Kafka即使只有一个分区,也应该以1500个事件/秒的速度完成。在我的实验开始的时候,问题还出在一个分区上。我使用没有接收器的直接方法,因此spark使用两个工作节点同时读取主题。
编辑2:我找到了导致这种不良吞吐量的原因。我忘了提到我的架构中的一个组件。我使用一个中央flume代理通过netcat通过log4j记录来自模拟器示例的所有事件。此flume代理是性能问题的原因!我将log4j配置更改为使用异步记录器(https://logging.apache.org/log4j/2.x/manual/async.html)通过干扰器。我将flume代理扩展到更多cpu内核和ram,并将通道更改为文件通道。但它的表现还是很差。没有效果。。。还有其他的方法来调节Flume的性能吗?
2条答案
按热度按时间mfpqipee1#
正如我之前提到的,cpu和ram的利用率是非常好的。我发现了一个“魔法极限”,似乎正好是每秒1500个事件。当我超过这个极限时,输入速率立即开始摆动。
错误的是处理时间和调度延迟保持不变。所以可以排除背压效应,对吗?
我唯一能猜到的是gcp/dataproc的技术限制。。。我在谷歌文档上没有找到任何提示。
还有别的主意吗?
5vf7fwbs2#
鉴于信息量稀少,很难说。我怀疑是内存问题-在某个时候,服务器甚至可能开始交换。因此,检查所有服务器上的jvm内存利用率和交换活动。elasticsearch应该能够以每秒约15000条记录的速度进行处理,而无需进行任何调整。检查服务器上可用的和已提交的ram。