在试图在对pyspark RDD的map调用中构造numpy ndarray的子类时,我一直在与一个意外行为作斗争。具体地说,我在ndarray子类中添加的属性似乎被从结果RDD中剥离了。
以下片段包含了问题的实质。
import numpy as np
class MyArray(np.ndarray):
def __new__(cls,shape,extra=None,*args):
obj = super().__new__(cls,shape,*args)
obj.extra = extra
return obj
def __array_finalize__(self,obj):
if obj is None:
return
self.extra = getattr(obj,"extra",None)
def shape_to_array(shape):
rval = MyArray(shape,extra=shape)
rval[:] = np.arange(np.product(shape)).reshape(shape)
return rval
如果我直接调用shape_to_array(不在pyspark下),它的行为将与预期的一样:
x = shape_to_array((2,3,5))
print(x.extra)
输出:
(2, 3, 5)
但是,如果我通过Map到输入的RDD来调用shape_to_array,它就会变得不稳定:
from pyspark.sql import SparkSession
sc = SparkSession.builder.appName("Steps").getOrCreate().sparkContext
rdd = sc.parallelize([(2,3,5),(2,4),(2,5)])
result = rdd.map(shape_to_array).cache()
print(result.map(lambda t:type(t)).collect())
print(result.map(lambda t:t.shape).collect())
print(result.map(lambda t:t.extra).collect())
输出:
[<class '__main__.MyArray'>, <class '__main__.MyArray'>, <class '__main__.MyArray'>]
[(2, 3, 5), (2, 4), (2, 5)]
22/10/15 15:48:02 ERROR Executor: Exception in task 7.0 in stage 2.0 (TID 23)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/Cellar/apache-spark/3.3.0/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main
process()
File "/usr/local/Cellar/apache-spark/3.3.0/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 678, in process
serializer.dump_stream(out_iter, outfile)
File "/usr/local/Cellar/apache-spark/3.3.0/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 273, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/usr/local/Cellar/apache-spark/3.3.0/libexec/python/lib/pyspark.zip/pyspark/util.py", line 81, in wrapper
return f(*args,**kwargs)
File "/var/folders/w7/42_p7mcd1y91_tjd0jzr8zbh0000gp/T/ipykernel_94831/2519313465.py", line 1, in <lambda>
AttributeError: 'MyArray' object has no attribute 'extra'
MyArray示例的 extra 属性发生了什么变化?
非常感谢您的任何/所有建议
**EDIT:一点额外的信息。如果我在返回之前在shape_to_array函数中添加日志记录,我可以验证 extra 属性是否存在于正在返回的DataArray对象上。但是当我尝试从主驱动程序访问RDD中的DataArray元素时,它们却消失了。
1条答案
按热度按时间2admgd591#
在这上面睡了一夜之后,我想起我经常遇到Pyspark RDDs的问题,其中错误消息必须做返回类型,而不是与Pickle一起工作。
这次我没有收到这个错误消息,因为numpy.ndarray可以使用pickle。**但是...**numpy.ndarray的
__reduce__
和__setstate__
方法对MyArray子类上添加的 extra 属性一无所知。这就是 extra 被剥离的地方。向MyArray添加以下两个方法解决了所有问题。
感谢任何花时间思考我的问题的人。