也许我只是错过了smth,但我只是不知道该去哪里找。
我读了来自两个来源的消息,基于一个公共密钥建立了一个连接,然后把它全部交给Kafka。
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(3)
...
source1
.keyBy(_.searchId)
.connect(source2.keyBy(_.searchId))
.process(new SearchResultsJoinFunction)
.addSink(KafkaSink.sink)
因此,当我在本地启动它时,它可以很好地工作,而且它也可以在并行度设置为1的集群上工作,但是在并行度设置为3的集群上不再工作。
当我将其部署到1个作业管理器和3个任务管理器并使每个任务处于“运行”状态时,2分钟后(当没有任何任务提交到接收器时),其中一个任务管理器将获得以下日志:
https://gist.github.com/zavalit/1b1bf6621bed2a3848a05c1ef84c689c#file-文件1-txt-l108
整件事都结束了。
任何暗示我都会感激的。tnx,提前。
1条答案
按热度按时间pdkcd3nj1#
问题似乎是这个任务管理器——flink-taskmanager-12-2qvcd(10.81.53.209)——无法与至少一个其他任务管理器交谈,即flink-taskmanager-12-57jzd(10.81.40.124:46240)。这就是为什么作业从未真正开始运行。
我会检查另一个任务管理器的日志,看看它说了什么,我还会检查您的网络配置。也许是防火墙挡了路?