scala—apacheflink中数据集的并集

anhgbhbe  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(373)

我在试着和一个 Seq[DataSet(Long,Long,Double)] 一个单身汉 DataSet[(Long,Long,Double)] 在Flink:

  1. val neighbors= graph.map(el => zKnn.neighbors(results,
  2. el.vector, 150, metric)).reduce(
  3. (a, b) => a.union(b)
  4. ).collect()

其中graph是一个常规的scala集合,但可以转换为数据集;结果是 DataSet[Vector] 不应该被收集并且是邻居方法中需要的
我总是得到一个flinkruntime例外:
当前无法处理输出超过64个的节点。org.apache.flink.optimizer.compilereException:当前无法处理输出超过64个的节点。在org.apache.flink.optimizer.dag.optimizernode.addoutgoingconnection(optimizernode。java:347)在org.apache.flink.optimizer.dag.singleinputnode.setinput(singleinputnode。java:202

wfauudbj

wfauudbj1#

flink目前不支持输入数据集超过64个的联合运算符。
作为一种解决方法,您可以按层次结构合并多达64个数据集,并在层次结构的级别之间插入标识Map器。比如:

  1. DataSet level1a = data1.union(data2.union(data3...(data64))).map(new IDMapper());
  2. DataSet level1b = data65.union(data66...(data128))).map(new IDMapper());
  3. DataSet level2 = level1a.union(level1b)

相关问题