在flink中,如何多次遍历iterable对象

krcsximq  于 2021-06-21  发布在  Flink
关注(0)|答案(0)|浏览(502)

Flink使用 Iterable 在某个操作员身上。我有一个问题:为什么用iterable代替list?是不是因为迭代对象可能不会立即放入内存?如果是的话,
当我需要把iterable中的所有元素都放到内存中时,我应该怎么做?
当我需要遍历两次时应该怎么做?
下面是详细的代码。在 apply 函数,我首先需要遍历 iterable 建立r树索引。然后我需要遍历的是再次查询。

  1. public class AllIndexRangeJoinProcess extends RichWindowFunction<Tuple2<Boolean, Point>, List<Point>, Long, TimeWindow> {
  2. private static final int RTREE_NODE_CAPACITY = 50;
  3. private double distance;
  4. private transient SpatialTreeIndex<Point> treeIndex;
  5. public AllIndexRangeJoinProcess(double distance) {
  6. this.distance = distance;
  7. }
  8. @Override
  9. public void open(Configuration parameters) throws Exception {
  10. treeIndex = new RTreeIndex<>(RTREE_NODE_CAPACITY);
  11. }
  12. @Override
  13. public void apply(Long aLong,
  14. TimeWindow timeWindow,
  15. Iterable<Tuple2<Boolean, Point>> iterable,
  16. Collector<List<Point>> collector) throws Exception {
  17. // build index
  18. List<Point> points = new ArrayList<>();
  19. for (Tuple2<Boolean, Point> t : iterable) {
  20. points.add(t.f1);
  21. treeIndex.insert(t.f1);
  22. }
  23. // query
  24. for (Point p : points) {
  25. List<Point> bufferedPoints = new ArrayList<>();
  26. bufferedPoints.add(p);
  27. bufferedPoints.addAll(treeIndex.query(p.getBufferedEnvelope(distance)));
  28. collector.collect(bufferedPoints);
  29. }
  30. }
  31. }

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题