我是spark的新手,我知道spark通常会序列化函数并将其发送给所有执行者,并处理hdfs中可用的数据块。但如果我有以下代码,
Random random = new Random(); //statement A
int randomValue = random.nextInt(); //statement B
JavaPairRDD<String, Integer> pairRDD = mapRandom.mapToPair(s -> {
return new Tuple2<>(s, randomValue);
});
声明到底在哪里 A
, B
跑?当然,它不会在每个执行器中执行它,因为每个执行器都运行自己的jvm,我发现每个值都Map到完全相同的jvm randomValue
.
是指这些陈述吗 A
, B
运行驱动程序并复制到所有执行者?
2条答案
按热度按时间oalqel3c1#
a和b在驱动程序上运行并被发送给执行者。
如果要在executors中运行它,应该执行以下操作:
ohtdti5x2#
rdd是分布式的(因此得名),但它是在驱动程序代码中定义的,因此任何不与rdd内容交互的都是在驱动程序上执行的代码(根据经验),而任何与rdd内容相关的代码都将在执行程序上运行。
正如你所注意到的,司机计算你的
randomValue
一次并将其发送给所有遗嘱执行人mapToPair
lambda参数关闭该计算值。移动
random.nextInt()
lambda内部的调用将触发对分布式集合中的每个元素执行的调用。此外,还有random
值本身将被序列化并通过有线传输。在lambda中移动随机创建本身会使它变小(没有外部状态可捕获),但会创建一个新的
Random
分布式集合中每个元素的示例,这显然是次优的。要使每个执行器有一个随机的示例值,可以使其成为一个静态成员(或一个懒惰的单例),每个jvm/executor初始化一次。要使每个分区具有不同的随机值,您可能应该使用
forEachPartition
或者类似的,生成一个新的值nextInt()
并将该值用于该分区中的所有元素。看看更高级别的dataframe/sqlapi,因为您可能会发现实现所需的功能要容易得多,甚至不必担心代码在哪里执行。