无法更新Pyspark中的变量

gdrx4gfi  于 2022-11-01  发布在  Spark
关注(0)|答案(1)|浏览(155)

我正在尝试更新pyspark中的变量,并希望在另一个方法中使用相同的变量。我正在类中使用**@property**,当我在python中测试它时,它按预期工作,但当我尝试在pyspark中实现它时,它没有更新变量。请帮助我找出我做错了什么。

代码:

class Hrk(object):
    def __init__(self, hrkval):
        self.hrkval = hrkval

    @property
    def hrkval(self):
        return self._hrkval

    @hrkval.setter
    def hrkval(self, value):
        self._hrkval = value

    @hrkval.deleter
    def hrkval(self):
        del self._hrkval

filenme = sc.wholeTextFiles("/user/root/CCDs")

hrk = Hrk("No Value")

def add_demo(filename):
   pfname[]
   plname[]
   PDOB[]
   gender[]
   .......i have not mentioned my logic, i skipped that part......
   hrk.hrkval = pfname[0]+"@#"+plname[0]+PDOB[0]+gender[0]
   return (str(hrk.hrkval))

def add_med(filename):
   return (str(hrk.hrkval))

filenme.map(getname).map(add_demo).saveAsTextFile("/user/cloudera/Demo/")
filenme.map(getname).map(add_med).saveAsTextFile("/user/cloudera/Med/")

在我的第一个方法调用(add_demo)中,我得到了正确的值,但是当我想在第二个方法中使用相同的变量时,我得到了No Value。我不知道为什么它没有更新变量。在Python中,类似的逻辑工作得很好。

8yoxcaq7

8yoxcaq71#

您正在尝试使用map API来改变全局变量的状态。这不是Spark的推荐模式。您尝试尽可能使用纯函数,并使用.reduce.reduceByKey.foldLeft之类的操作。下面的简化示例不起作用的原因是,当调用.map时,Spark为函数f1创建闭包,为每个“分区”创建hrk对象的副本,并将其应用于每个分区内的行。

import pyspark
import pyspark.sql

number_cores = 2
memory_gb = 1
conf = (
    pyspark.SparkConf()
        .setMaster('local[{}]'.format(number_cores))
        .set('spark.driver.memory', '{}g'.format(memory_gb))
)

c = pyspark.SparkContext(conf=conf)
spark = pyspark.sql.SQLContext(sc)

class Hrk(object):
    def __init__(self, hrkval):
        self.hrkval = hrkval

    @property
    def hrkval(self):
        return self._hrkval

    @hrkval.setter
    def hrkval(self, value):
        self._hrkval = value

    @hrkval.deleter
    def hrkval(self):
        del self._hrkval

hrk = Hrk("No Value")
print(hrk.hrkval)

# No Value

def f1(x):
    hrk.hrkval = str(x)
    return "str:"+str(hrk.hrkval)

data = sc.parallelize([1,2,3])
data.map(f1).collect()

# ['str:1', 'str:2', 'str:3']

print(hrk.hrkval)

# No Value

你可以在rdd编程指南的了解闭包一节中阅读更多关于闭包的内容,这里有一些重要的片段:
Spark的难点之一是在集群中执行代码时,要理解变量和方法的作用域和生命周期。RDD操作在作用域之外修改变量,这常常会引起混淆。在下面的例子中,我们将看到使用foreach()来增加计数器的代码,但类似的问题也可能发生在其他操作中。
一般来说,闭包--像循环或局部定义的方法这样的结构,不应该被用来改变一些全局状态。Spark不定义或保证从闭包外部引用的对象的改变行为。一些这样做的代码可能在局部模式下工作,但这只是偶然的,而且这样的代码在分布式模式下不会像预期的那样工作。如果需要某些全局聚合,请使用累加器。
https://spark.apache.org/docs/latest/rdd-programming-guide.html#understanding-closures-

相关问题