我将spark structured streaming(2.3.0)与kafka(1.0.0)结合使用。
val event_stream: DataStreamReader = spark
.readStream
.format(_source)
.option("kafka.bootstrap.servers", _brokers)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "false")
我正在用100克的数据测试一个Kafka主题的管道。在kafka broker(3个引导节点,每个节点有2g堆/4g ram)上,我经常(几乎每秒)看到这个警告消息:
WARN Attempting to send response via channel for which there is no open connection, connection id 10.230.0.81:9092-10.230.0.116:39110-399 (kafka.network.Processor)
我还看到代理上的堆消耗稳步增加到gc中的%time接近100(没有太多内存释放),从而导致oom和节点崩溃。使用以下选项:
-XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true
也试过替换 -XX:+DisableExplicitGC with -XX:+ExplicitGCInvokesConcurrent
基于Kafka-5470
我的消息生成速率约为7k消息/秒(1KB消息)。
我知道我们并没有把Kafka推向极限。但意外的是,oom事件和随后的节点崩溃发生了。
我将感谢那些经历过这些问题并对这一领域有深刻见解的人的注解/评论/意见。
编辑:
尝试使用合流推荐参数,我仍然观察到上述问题:
-Xms6g -Xmx6g -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20
-XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M
-XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80
我想知道spark kafka(结构化流媒体)集成是否存在漏洞。
1条答案
按热度按时间c3frrgcw1#
我只想在这里发布一个更新:
oom是一个未关闭的生产商(与spark无关)的结果,我们能够跟踪并修复它。
然而,警告信息在Kafka仍然可见。但它不会引起任何生产问题(据我们所知)
WARN Attempting to send response via channel for which there is no open connection, connection id 10.230.0.81:9092-10.230.0.116:39110-399 (kafka.network.Processor)