pyspark RDDs剥离numpy子类的属性

njthzxwz  于 2022-11-01  发布在  Spark
关注(0)|答案(1)|浏览(181)

在试图在对pyspark RDD的map调用中构造numpy ndarray的子类时,我一直在与一个意外行为作斗争。具体地说,我在ndarray子类中添加的属性似乎被从结果RDD中剥离了。
以下片段包含了问题的实质。

import numpy as np

class MyArray(np.ndarray):
    def __new__(cls,shape,extra=None,*args):
        obj = super().__new__(cls,shape,*args)
        obj.extra = extra
        return obj
    def __array_finalize__(self,obj):
        if obj is None:
            return
        self.extra = getattr(obj,"extra",None)

def shape_to_array(shape):
    rval = MyArray(shape,extra=shape)
    rval[:] = np.arange(np.product(shape)).reshape(shape)
    return rval

如果我直接调用shape_to_array(不在pyspark下),它的行为将与预期的一样:

x = shape_to_array((2,3,5))
print(x.extra)

输出:

(2, 3, 5)

但是,如果我通过Map到输入的RDD来调用shape_to_array,它就会变得不稳定:

from pyspark.sql import SparkSession
sc = SparkSession.builder.appName("Steps").getOrCreate().sparkContext

rdd = sc.parallelize([(2,3,5),(2,4),(2,5)])
result = rdd.map(shape_to_array).cache()
print(result.map(lambda t:type(t)).collect())
print(result.map(lambda t:t.shape).collect())
print(result.map(lambda t:t.extra).collect())

输出:

[<class '__main__.MyArray'>, <class '__main__.MyArray'>, <class '__main__.MyArray'>]

[(2, 3, 5), (2, 4), (2, 5)]

22/10/15 15:48:02 ERROR Executor: Exception in task 7.0 in stage 2.0 (TID 23)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/Cellar/apache-spark/3.3.0/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main
    process()
  File "/usr/local/Cellar/apache-spark/3.3.0/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 678, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/local/Cellar/apache-spark/3.3.0/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 273, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/local/Cellar/apache-spark/3.3.0/libexec/python/lib/pyspark.zip/pyspark/util.py", line 81, in wrapper
    return f(*args,**kwargs)
  File "/var/folders/w7/42_p7mcd1y91_tjd0jzr8zbh0000gp/T/ipykernel_94831/2519313465.py", line 1, in <lambda>
AttributeError: 'MyArray' object has no attribute 'extra'

MyArray示例的 extra 属性发生了什么变化?
非常感谢您的任何/所有建议

**EDIT:一点额外的信息。如果我在返回之前在shape_to_array函数中添加日志记录,我可以验证 extra 属性是否存在于正在返回的DataArray对象上。但是当我尝试从主驱动程序访问RDD中的DataArray元素时,它们却消失了。

2admgd59

2admgd591#

在这上面睡了一夜之后,我想起我经常遇到Pyspark RDDs的问题,其中错误消息必须做返回类型,而不是与Pickle一起工作。
这次我没有收到这个错误消息,因为numpy.ndarray可以使用pickle。**但是...**numpy.ndarray的__reduce____setstate__方法对MyArray子类上添加的 extra 属性一无所知。这就是 extra 被剥离的地方。
向MyArray添加以下两个方法解决了所有问题。

def __reduce__(self):
    mthd,cls,args = super().__reduce__(self)
    return mthd, cls, args + (self.extra,)

def __setstate__(self,args):
    super().__setstate__(args[:-1])
    self.extra = args[-1]

感谢任何花时间思考我的问题的人。

相关问题