我正在使用 Flink v1.4.0
.
我在利用 Flink
的原生图形api(gelly),我正在使用它处理1200万个数据点(边)。我目前正在通过 IntelliJ
使用 Flink
在同一jvm中执行所有TaskManager和jobmanager的微型集群。
当我加载数据时,有效地生成我的边,就在我得到数据之前 Flink
要运行的作业,我总是遇到以下例外情况:
...
Connected to JobManager at Actor[akka://flink/user/jobmanager_1#XXXXXXXXXX] with leader session id XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX.
322062 [main] ERROR com.somecompany.some.domain.name.some.javaClass- Error executing pipeline
java.util.concurrent.TimeoutException: Futures timed out after [10000 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:157)
at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:169)
at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:169)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
...
我确保通过 IntelliJ
添加:
-Dakka.client.timeout:600s
-Dakka.ask.timeout:600s
但例外情况依然存在,至于是什么原因造成的,我没有其他线索。当我减少数据大小时,一切正常。
当我试图通过 Flink UI
在的本地版本上运行 Flink
我已经在集群上安装了。在这种情况下,作业永远不会启动,我甚至无法预览自动生成的运算符的dag。当我减少要处理的数据量时,问题又消失了。我还更新了 flink-conf.yaml
包括相同的 akka
配置属性,但这没有什么区别。
我该怎么解决这个问题?
1条答案
按热度按时间zbq4xfa01#
在intellij中运行flink作业时,依赖flink小型集群。小型集群不同于在独立、yarn或mesos上运行flink,因为它依赖于单个jvm。此外,微型集群是以多种方式预先配置的,并且改变这种配置并不总是可能的(至少在某些设置方面是如此)。
在将作业提交到集群时(而不是在通过小型集群运行作业时),我必须改变的一点是分配给作业管理器的堆内存的大小。这是必要的,因为加载要处理的数据并不是我想要运行的flink作业的一部分(这在flink中不是标准实践,这样做实际上是错误的)。通过增加作业管理器的堆,我可以让我的作业运行,但最终我必须为我的flink作业定义一个新的输入格式,以避免作业管理器不得不预加载数据以执行—无论如何,这不应该是它的责任。
现在的问题是:不能通过intellij(据我所知)将堆内存分配给作业管理器,因此作业总是会失败。