我正在尝试更新pyspark中的变量,并希望在另一个方法中使用相同的变量。我正在类中使用**@property**,当我在python中测试它时,它按预期工作,但当我尝试在pyspark中实现它时,它没有更新变量。请帮助我找出我做错了什么。
代码:
class Hrk(object):
def __init__(self, hrkval):
self.hrkval = hrkval
@property
def hrkval(self):
return self._hrkval
@hrkval.setter
def hrkval(self, value):
self._hrkval = value
@hrkval.deleter
def hrkval(self):
del self._hrkval
filenme = sc.wholeTextFiles("/user/root/CCDs")
hrk = Hrk("No Value")
def add_demo(filename):
pfname[]
plname[]
PDOB[]
gender[]
.......i have not mentioned my logic, i skipped that part......
hrk.hrkval = pfname[0]+"@#"+plname[0]+PDOB[0]+gender[0]
return (str(hrk.hrkval))
def add_med(filename):
return (str(hrk.hrkval))
filenme.map(getname).map(add_demo).saveAsTextFile("/user/cloudera/Demo/")
filenme.map(getname).map(add_med).saveAsTextFile("/user/cloudera/Med/")
在我的第一个方法调用(add_demo)中,我得到了正确的值,但是当我想在第二个方法中使用相同的变量时,我得到了No Value。我不知道为什么它没有更新变量。在Python中,类似的逻辑工作得很好。
1条答案
按热度按时间8yoxcaq71#
您正在尝试使用
map
API来改变全局变量的状态。这不是Spark的推荐模式。您尝试尽可能使用纯函数,并使用.reduce
或.reduceByKey
或.foldLeft
之类的操作。下面的简化示例不起作用的原因是,当调用.map
时,Spark为函数f1
创建闭包,为每个“分区”创建hrk
对象的副本,并将其应用于每个分区内的行。你可以在rdd编程指南的了解闭包一节中阅读更多关于闭包的内容,这里有一些重要的片段:
Spark的难点之一是在集群中执行代码时,要理解变量和方法的作用域和生命周期。RDD操作在作用域之外修改变量,这常常会引起混淆。在下面的例子中,我们将看到使用foreach()来增加计数器的代码,但类似的问题也可能发生在其他操作中。
一般来说,闭包--像循环或局部定义的方法这样的结构,不应该被用来改变一些全局状态。Spark不定义或保证从闭包外部引用的对象的改变行为。一些这样做的代码可能在局部模式下工作,但这只是偶然的,而且这样的代码在分布式模式下不会像预期的那样工作。如果需要某些全局聚合,请使用累加器。
https://spark.apache.org/docs/latest/rdd-programming-guide.html#understanding-closures-