尝试通过没有连接的通道发送响应

xj3cbfub  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(1221)

我将spark structured streaming(2.3.0)与kafka(1.0.0)结合使用。

val event_stream: DataStreamReader = spark
  .readStream
  .format(_source)
  .option("kafka.bootstrap.servers", _brokers)
  .option("subscribe", topic)
  .option("startingOffsets", "earliest")
  .option("failOnDataLoss", "false")

我正在用100克的数据测试一个Kafka主题的管道。在kafka broker(3个引导节点,每个节点有2g堆/4g ram)上,我经常(几乎每秒)看到这个警告消息:

WARN Attempting to send response via channel for which there is no open connection, connection id 10.230.0.81:9092-10.230.0.116:39110-399 (kafka.network.Processor)

我还看到代理上的堆消耗稳步增加到gc中的%time接近100(没有太多内存释放),从而导致oom和节点崩溃。使用以下选项:

-XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true

也试过替换 -XX:+DisableExplicitGC with -XX:+ExplicitGCInvokesConcurrent 基于Kafka-5470
我的消息生成速率约为7k消息/秒(1KB消息)。
我知道我们并没有把Kafka推向极限。但意外的是,oom事件和随后的节点崩溃发生了。
我将感谢那些经历过这些问题并对这一领域有深刻见解的人的注解/评论/意见。
编辑:
尝试使用合流推荐参数,我仍然观察到上述问题:

-Xms6g -Xmx6g -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20
       -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M
       -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80

我想知道spark kafka(结构化流媒体)集成是否存在漏洞。

c3frrgcw

c3frrgcw1#

我只想在这里发布一个更新:
oom是一个未关闭的生产商(与spark无关)的结果,我们能够跟踪并修复它。
然而,警告信息在Kafka仍然可见。但它不会引起任何生产问题(据我们所知) WARN Attempting to send response via channel for which there is no open connection, connection id 10.230.0.81:9092-10.230.0.116:39110-399 (kafka.network.Processor)

相关问题