flink作业不分布在机器上

mbyulnm0  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(340)

我在apache flink中有一个小用例,它是一个批处理系统。我需要处理一堆文件。每个文件的处理必须由一台机器处理。我有下面的代码。一直以来,只有一个任务槽被占用,文件被一个接一个地处理。我有6个节点(所以有6个任务管理器),每个节点配置了4个任务槽。所以,我希望一次处理24个文件。

  1. class MyMapPartitionFunction extends RichMapPartitionFunction[java.io.File, Int] {
  2. override def mapPartition(
  3. myfiles: java.lang.Iterable[java.io.File],
  4. out:org.apache.flink.util.Collector[Int])
  5. : Unit = {
  6. var temp = myfiles.iterator()
  7. while(temp.hasNext()){
  8. val fp1 = getRuntimeContext.getDistributedCache.getFile("hadoopRun.sh")
  9. val file = new File(temp.next().toURI)
  10. Process(
  11. "/bin/bash ./run.sh " + argumentsList(3)+ "/" + file.getName + " " + argumentsList(7) + "/" + file.getName + ".csv",
  12. new File(fp1.getAbsoluteFile.getParent))
  13. .lines
  14. .foreach{println}
  15. out.collect(1)
  16. }
  17. }
  18. }

我启动了flinkas./bin/start-cluster.sh命令,web用户界面显示它有6个任务管理器,24个任务槽。
这些文件夹包含大约49个文件。当我在这个集合上创建mappartition时,我希望跨越49个并行进程。但是,在我的基础设施中,它们都被一个接一个地处理。这意味着只有一台机器(一个任务管理器)处理所有49个文件名。我想要的是,由于每个插槽配置了2个任务,我希望同时处理24个文件。
任何提示在这里都会有帮助。我在flink-conf.yaml文件中有这些参数

  1. jobmanager.heap.mb: 2048
  2. taskmanager.heap.mb: 1024
  3. taskmanager.numberOfTaskSlots: 4
  4. taskmanager.memory.preallocate: false
  5. parallelism.default: 24

提前谢谢。有人能告诉我哪里出了问题吗?

qyyhg6bp

qyyhg6bp1#

正如大卫所描述的,问题是 env.fromCollection(Iterable[T]) 创建 DataSource 与非平行 InputFormat . 因此 DataSource 以并行方式执行 1 . 后续操作员( mapPartition )从源代码继承这个并行性,这样它们就可以被链接起来了(这为我们节省了一次网络洗牌)。
解决这个问题的方法是显式地重新平衡源代码 DataSet 通过

  1. env.fromCollection(folders).rebalance()

或在后续运算符处显式设置所需的并行性( mapPartition ):

  1. env.fromCollection(folders).mapPartition(...).setParallelism(49)

相关问题