对自实现对象/类的函数使用pyrdd.parallelize().map()

jmo0nnb3  于 2021-07-14  发布在  Spark
关注(0)|答案(0)|浏览(260)

我有一些我想并行计算的对象,因此我想我可以求助于pyspark。
考虑这个例子,一个类的对象有一个数字 i ,可以和 square() :

class MyMathObject():
    def __init__(self, i):
        self.i = i
    def square(self):
        return self.i**2

print(MyMathObject(3).square()) # Test one instance with regular python - works

另外,我设置了pyspark(在jupyter笔记本中),现在我想在我的对象上并行计算0到4的平方:

import findspark
findspark.init()
from pyspark import SparkContext

sc = SparkContext("local[2]")

rdd = sc.parallelize([MyMathObject(i) for i in range(5)])
rdd.map(lambda obj: obj.square()).collect() # This fails

这不起作用-它会导致一个非常长的,对我来说几乎没有帮助的错误消息。我觉得唯一有意思的一句话是:
attributeerror:无法在<module'pyspark.daemon'from'/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/daemon.py'>
所以,这似乎与属性 square() 被称为,左右。我把完整的信息抄在最后。
Pypark本身似乎起作用;例如,在一个普通的python列表上执行下面的命令将返回预期的平方数。

rdd = sc.parallelize([i for i in range(5)])
rdd.map(lambda i: i**2).collect()

因此,我创建或操作对象的方式似乎有缺陷,但我无法追踪错误。
完整的错误消息:
py4jjavaerror traceback(最近一次调用last)in 1 rdd=sc.parallelize([mymathobject(i)for i in range(5)])--->2 rdd.map(lambda obj:obj.square()).collect()
/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/pyspark/rdd.py in collect(self)887”““888,使用sccallsitesync(self.context)作为css:-->889 sock\u info=self.ctx.\u jvm.pythonrdd.collectandserve(self.\u jrdd.rdd())890返回列表(\u load \u from \u socket(sock\u info,self.\u jrdd\u反序列化程序))891
/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java\u gateway.py in call(self,*args)1302 1303 answer=self.gateway\u client.send\u command(command)->1304 return\u value=get\u return\u value(1305 answer,self.gateway\u client,self.target\u id,self.name)1306
/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get\u return\u value(answer,gateway\u client,target\u id,name)324 value=output\u converter[type](answer[2:],gateway\u client)325 if answer[1]==reference\u type:-->326 raise py4jjavaerror(327“调用{0}{1}{2}时出错。\n”。328格式(目标id,“.”,名称),值)
py4jjavaerror:调用z:org.apache.spark.api.pythonrdd.collectandserve时出错:org.apache.spark.sparkexception:由于阶段失败而中止作业:阶段1.0中的任务0失败了1次,最近的失败:阶段1.0中的任务0.0丢失(tid 2192.168.2.108,executor driver):org.apache.spark.api.python异常:traceback(最近一次调用):file“/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py”,第605行,在main process()file“/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py”,第597行,进程内serializer.dump_stream(out_iter,outfile)file“/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py”,第271行,dump_stream vs=list(itertools.islice(iterator,batch))file“/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py”,第147行,在load\u stream yield self.\u read\u with\u length(stream)file“/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py”,第172行,在load read\u with\u length return self.load(obj)file“/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py”,第458行,在load return pickle,encoding=encoding)attributeerror:在<module'pyspark.daemon'from'/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/daemon.py'>
位于org.apache.spark.api.python.basepythonrunner$readeriterator.handlepythonexception(pythonrunner)。scala:503)在org.apache.spark.api.pythonrunner$$anon$3.read(pythonrunner。scala:638)在org.apache.spark.api.pythonrunner$$anon$3.read(pythonrunner。scala:621)在org.apache.spark.api.python.basepythonrunner$readeriterator.hasnext(pythonrunner。scala:456)在org.apache.spark.interruptibleiterator.hasnext(interruptibleiterator。scala:37)在scala.collection.iterator.foreach(iterator。scala:941)在scala.collection.iterator.foreach$(iterator。scala:941)在org.apache.spark.interruptibleiterator.foreach(interruptibleiterator。scala:28)在scala.collection.generic.growtable.$plus$eq(可增长)。scala:62)在scala.collection.generic.growtable.$plus$plus$eq$(可增长。scala:53)在scala.collection.mutable.arraybuffer.$plus$plus$eq(arraybuffer。scala:105)在scala.collection.mutable.arraybuffer.$plus$plus$eq(arraybuffer。scala:49)在scala.collection.traversableonce.to(traversableonce。scala:315)在scala.collection.traversableonce.to$(traversableonce。scala:313)在org.apache.spark.interruptibleiterator.to(interruptibleiterator。scala:28)在scala.collection.traversableonce.tobuffer(traversableonce。scala:307)在scala.collection.traversableonce.tobuffer$(traversableonce。scala:307)在org.apache.spark.interruptibleiterator.tobuffer(interruptibleiterator。scala:28)在scala.collection.traversableonce.toarray(traversableonce。scala:294)在scala.collection.traversableonce.toarray$(traversableonce。scala:288)在org.apache.spark.interruptibleiterator.toarray(interruptibleiterator.org.apache.spark.interruptibleiterator.toarray)。scala:28)在org.apache.spark.rdd.rdd.$anonfun$collect$2(rdd。scala:1004)在org.apache.spark.sparkcontext.$anonfun$runjob$5(sparkcontext。scala:2154)在org.apache.spark.scheduler.resulttask.runtask(resulttask。scala:90)在org.apache.spark.scheduler.task.run(task。scala:127)在org.apache.spark.executor.executor$taskrunner.$anonfun$run$3(executor。scala:462)在org.apache.spark.util.utils$.trywithsafefinally(utils。scala:1377)在org.apache.spark.executor.executor$taskrunner.run(executor。scala:465) 位于java.base/java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1128)在java.base/java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:628)在java.base/java.lang.thread.run(thread。java:834)
驱动程序stacktrace:org.apache.spark.scheduler.dagscheduler.failjobandindependentstages(dagscheduler)。scala:2059)在org.apache.spark.scheduler.dagscheduler.$anonfun$abortstage$2(dagscheduler。scala:2008)在org.apache.spark.scheduler.dagscheduler.$anonfun$abortstage$2$adapted(dagscheduler。scala:2007)在scala.collection.mutable.resizablearray.foreach(resizablearray。scala:62)位于scala.collection.mutable.resizablearray.foreach$(resizablearray。scala:55)在scala.collection.mutable.arraybuffer.foreach(arraybuffer。scala:49)在org.apache.spark.scheduler.dagscheduler.abortstage(dagscheduler。scala:2007)在org.apache.spark.scheduler.dagscheduler.$anonfun$handletasksetfailed$1(dagscheduler)。scala:973)位于org.apache.spark.scheduler.dagscheduler.$anonfun$handletasksetfailed$1$adapted(dagscheduler)。scala:973)在scala.option.foreach(option。scala:407)在org.apache.spark.scheduler.dagscheduler.handletasksetfailed(dagscheduler。scala:973)在org.apache.spark.scheduler.dagschedulereventprocessloop.doonreceive(dagscheduler。scala:2239)位于org.apache.spark.scheduler.dagschedulereventprocessloop.onreceive(dagscheduler。scala:2188)位于org.apache.spark.scheduler.dagschedulereventprocessloop.onreceive(dagscheduler。scala:2177)在org.apache.spark.util.eventloop$$anon$1.run(eventloop。scala:49) 在org.apache.spark.scheduler.dagscheduler.runjob(dagscheduler。scala:775)在org.apache.spark.sparkcontext.runjob(sparkcontext。scala:2114)在org.apache.spark.sparkcontext.runjob(sparkcontext。scala:2135)在org.apache.spark.sparkcontext.runjob(sparkcontext。scala:2154)在org.apache.spark.sparkcontext.runjob(sparkcontext。scala:2179)在org.apache.spark.rdd.rdd.$anonfun$collect$1(rdd。scala:1004)在org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope。scala:151)在org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope。scala:112)在org.apache.spark.rdd.rdd.withscope(rdd。scala:388)在org.apache.spark.rdd.rdd.collect(rdd。scala:1003)在org.apache.spark.api.pythonrdd$.collectandserve(pythonrdd。scala:168)org.apache.spark.api.pythonrdd.collectandserve(pythonrdd.scala)java.base/jdk.internal.reflect.nativemethodaccessorimpl.invoke0(本机方法)java.base/jdk.internal.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl)。java:62)在java.base/jdk.internal.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl。java:43)在java.base/java.lang.reflect.method.invoke(method。java:566)在py4j.reflection.methodinvoker.invoke(methodinvoker。java:244)在py4j.reflection.reflectionengine.invoke(reflectionengine。java:357)在py4j.gateway.invoke(gateway。java:282)在py4j.commands.abstractcommand.invokemethod(abstractcommand。java:132)在py4j.commands.callcommand.execute(callcommand。java:79)在py4j.gatewayconnection.run(网关连接。java:238)在java.base/java.lang.thread.run(thread。java:834)引起原因:org.apache.spark.api.python.python异常:traceback(最近一次调用last):file“/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py”,第605行,in main process()文件“/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py”,第597行,in process serializer.dump\u stream(out\iter,outfile)文件“/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py”,第271行,dump\u stream vs=list(itertools.islice(iterator,batch))文件“/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py”,第147行,在load\u stream yield self.\u read\u with\u length(stream)file“/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py”,第172行,在load read\u with\u length return self.load(obj)file“/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py”,第458行,在load return pickle,encoding=encoding)attributeerror:在<module'pyspark.daemon'from'/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/daemon.py'>
在org.apache.spa

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题