原始java代码在spark中究竟在哪里执行?

mrphzbgm  于 2021-07-09  发布在  Spark
关注(0)|答案(2)|浏览(295)

我是spark的新手,我知道spark通常会序列化函数并将其发送给所有执行者,并处理hdfs中可用的数据块。但如果我有以下代码,

Random random = new Random(); //statement A
int randomValue = random.nextInt(); //statement B
JavaPairRDD<String, Integer> pairRDD = mapRandom.mapToPair(s -> {
    return new Tuple2<>(s, randomValue);
});

声明到底在哪里 A , B 跑?当然,它不会在每个执行器中执行它,因为每个执行器都运行自己的jvm,我发现每个值都Map到完全相同的jvm randomValue .
是指这些陈述吗 A , B 运行驱动程序并复制到所有执行者?

oalqel3c

oalqel3c1#

a和b在驱动程序上运行并被发送给执行者。
如果要在executors中运行它,应该执行以下操作:

JavaPairRDD<String, Integer> pairRDD = mapRandom.mapToPair(s -> {
    Random random = new Random(); //statement A
    int randomValue = random.nextInt(); //statement B
    return new Tuple2<>(s, randomValue);
});
ohtdti5x

ohtdti5x2#

rdd是分布式的(因此得名),但它是在驱动程序代码中定义的,因此任何不与rdd内容交互的都是在驱动程序上执行的代码(根据经验),而任何与rdd内容相关的代码都将在执行程序上运行。
正如你所注意到的,司机计算你的 randomValue 一次并将其发送给所有遗嘱执行人 mapToPair lambda参数关闭该计算值。
移动 random.nextInt() lambda内部的调用将触发对分布式集合中的每个元素执行的调用。此外,还有 random 值本身将被序列化并通过有线传输。
在lambda中移动随机创建本身会使它变小(没有外部状态可捕获),但会创建一个新的 Random 分布式集合中每个元素的示例,这显然是次优的。
要使每个执行器有一个随机的示例值,可以使其成为一个静态成员(或一个懒惰的单例),每个jvm/executor初始化一次。要使每个分区具有不同的随机值,您可能应该使用 forEachPartition 或者类似的,生成一个新的值 nextInt() 并将该值用于该分区中的所有元素。
看看更高级别的dataframe/sqlapi,因为您可能会发现实现所需的功能要容易得多,甚至不必担心代码在哪里执行。

相关问题