如何在PySpark中将Paired RDD保存为序列文件?

wn9m85ua  于 2023-03-28  发布在  Spark
关注(0)|答案(1)|浏览(157)

我看了类似的问题(this),但它没有帮助我的情况。我有麻烦与这段代码,任何建议是赞赏:

# in PySpark terminal
import numpy as np
a = np.arange(100)
rdd = spark.sparkContext.parallelize(a)
rdd.map(lambda x: (x, x**2)).saveAsSequenceFile("saved-rdd")

这会导致此错误

23/03/25 09:26:00 ERROR Executor: Exception in task 0.0 in stage 29.0 (TID 179)
net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype). This happens when an unsupported/unregistered class is being unpickled that requires construction arguments. Fix it by registering a custom IObjectConstructor for this class.
...
23/03/25 09:26:00 WARN TaskSetManager: Lost task 0.0 in stage 29.0 (TID 179) (975d59320ec4 executor driver): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype). This happens when an unsupported/unregistered class is being unpickled that requires construction arguments. Fix it by registering a custom IObjectConstructor for this class.
...
23/03/25 09:26:00 ERROR TaskSetManager: Task 0 in stage 29.0 failed 1 times; aborting job
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/spark/python/pyspark/rdd.py", line 2117, in saveAsSequenceFile
    pickledRDD._jrdd, True, path, compressionCodecClass
  File "/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1322, in __call__
  File "/spark/python/pyspark/sql/utils.py", line 190, in deco
    return f(*a, **kw)
  File "/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsSequenceFile.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 29.0 failed 1 times, most recent failure: Lost task 0.0 in stage 29.0 (TID 179) (975d59320ec4 executor driver): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype). This happens when an unsupported/unregistered class is being unpickled that requires construction arguments. Fix it by registering a custom IObjectConstructor for this class.
...
Caused by: net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype). This happens when an unsupported/unregistered class is being unpickled that requires construction arguments. Fix it by registering a custom IObjectConstructor for this class.
...

我试着用下面的代码:

rdd.map(lambda x: (x, x**2)).saveAsNewAPIHadoopFile("/opt/spark-data/saved-rdd","org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat","org.apache.hadoop.io.IntWritable","org.apache.hadoop.io.IntWritable")

但它会导致类似的错误:

23/03/25 09:28:15 ERROR Executor: Exception in task 0.0 in stage 30.0 (TID 180)
net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype). This happens when an unsupported/unregistered class is being unpickled that requires construction arguments. Fix it by registering a custom IObjectConstructor for this class.
...
23/03/25 09:28:15 WARN TaskSetManager: Lost task 0.0 in stage 30.0 (TID 180) (975d59320ec4 executor driver): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype). This happens when an unsupported/unregistered class is being unpickled that requires construction arguments. Fix it by registering a custom IObjectConstructor for this class.
...
23/03/25 09:28:15 ERROR TaskSetManager: Task 0 in stage 30.0 failed 1 times; aborting job
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/spark/python/pyspark/rdd.py", line 2003, in saveAsNewAPIHadoopFile
    jconf,
  File "/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1322, in __call__
  File "/spark/python/pyspark/sql/utils.py", line 190, in deco
    return f(*a, **kw)
  File "/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 30.0 failed 1 times, most recent failure: Lost task 0.0 in stage 30.0 (TID 180) (975d59320ec4 executor driver): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype). This happens when an unsupported/unregistered class is being unpickled that requires construction arguments. Fix it by registering a custom IObjectConstructor for this class.
...
Caused by: net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype). This happens when an unsupported/unregistered class is being unpickled that requires construction arguments. Fix it by registering a custom IObjectConstructor for this class.
...

另外,在一个不太重要的方面,saveAsSequenceFilesaveAsNewAPIHadoopFilesaveAsPickleFile之间有什么区别?

du7egjpx

du7egjpx1#

问题是我如何创建我的python列表。我使用了np.arange,这导致了numpy.int64元素的集合。当我将列表创建更改为标准range时,元素只是int,这并没有导致这个问题。代码方面的解释:

import numpy as np
a = range(100)
b = np.arange(100)
print(type(a[0]))
# <class 'int'>
print(type(b[0]))
# <class 'numpy.int64'>
rdd_a = spark.sparkContext.parallelize(a)
rdd_b = spark.sparkContext.parallelize(b)
rdd_a.map(lambda x: (x, x**2)).saveAsSequenceFile('saved-rdd_a')
# OK
rdd_b.map(lambda x: (x, x**2)).saveAsSequenceFile('saved-rdd_b')
# Same error

相关问题