访问flatmaptopair中的hashmap

zfycwa2u  于 2021-05-31  发布在  Hadoop
关注(0)|答案(2)|浏览(343)

编辑:已使用解决 RDD.collectAsMap() 我正在尝试复制第28-30页的问题解决方案http://on-demand.gputechconf.com/gtc/2016/presentation/s6424-michela-taufer-apache-spark.pdf
我有一个hashmap,我在map函数之外示例化它。hashmap包含以下数据:

{1:2, 2:3, 3:2, 4:2, 5:3}

以前定义的rdd previousrdd具有以下类型:

JavaPairRDD<Integer, Iterable<Tuple2<Integer, Integer>>>

有以下数据:

1: [(1,2), (1,5)]
2: [(2,1), (2,3), (2,5)]
3: [(3,2), (3,4)]
4: [(4,3), (4,5)]
5: [(5,1), (5,2), (5,4)]

我尝试使用flatmaptopair创建一个新rdd:

JavaPairRDD<Integer, Integer> newRDD = previousRDD.flatMapToPair(new PairFlatMapFunction<Tuple2<Integer, Iterable<Tuple2<Integer, Integer>>>, Integer, Integer>() {
    @Override
    public Iterator<Tuple2<Integer, Integer>> call(Tuple2<Integer, Iterable<Tuple2<Integer, Integer>>> integerIterableTuple2) throws Exception {
        Integer count;
        ArrayList<Tuple2<Integer, Integer>> list = new ArrayList<>();
        count = hashMap.get(integerIterableTuple2._1);
        for (Tuple2<Integer, Integer> t : integerIterableTuple2._2) {
            Integer tcount = hashMap.get(t._2);
            if (count < tcount || (count.equals(tcount) && integerIterableTuple2._1 < t._2)) {
                list.add(t);
            }
        }
        return list.iterator();
    }
});

但在这方面 hashMap.get(t._2) 在for循环中,大部分时间都是空的。我已经检查了hashmap中是否有正确的值。
有没有办法在spark函数中正确获取hashmap的值?

fjaof16o

fjaof16o1#

应该有用。spark应该捕获您的变量,序列化它,并随每个任务发送给每个worker。你可以试试广播这张Map

sc.broadcast(hashMap)

使用结果而不是 hashMap . 它在内存方面也更有效(每个执行器共享存储)。

juzqafwq

juzqafwq2#

我对类变量也有类似的问题。您可以尝试将变量设为局部变量,或多声明一个,如下所示:

Map localMap = hashMap;
 JavaPairRDD<Integer, Integer> newRDD = previousRDD.flatMapToPair(
   ...
      Integer tcount = localMap.get(t._2);
   ...
 );

我认为这是由于Spark序列化机制。你可以在这里了解更多。

相关问题