spark无法pickle方法描述符

yqyhoc1h  于 2021-06-09  发布在  Hbase
关注(0)|答案(2)|浏览(849)

我收到一条奇怪的错误信息

  1. 15/01/26 13:05:12 INFO spark.SparkContext: Created broadcast 0 from wholeTextFiles at NativeMethodAccessorImpl.java:-2
  2. Traceback (most recent call last):
  3. File "/home/user/inverted-index.py", line 78, in <module>
  4. print sc.wholeTextFiles(data_dir).flatMap(update).top(10)#groupByKey().map(store)
  5. File "/home/user/spark2/python/pyspark/rdd.py", line 1045, in top
  6. return self.mapPartitions(topIterator).reduce(merge)
  7. File "/home/user/spark2/python/pyspark/rdd.py", line 715, in reduce
  8. vals = self.mapPartitions(func).collect()
  9. File "/home/user/spark2/python/pyspark/rdd.py", line 676, in collect
  10. bytesInJava = self._jrdd.collect().iterator()
  11. File "/home/user/spark2/python/pyspark/rdd.py", line 2107, in _jrdd
  12. pickled_command = ser.dumps(command)
  13. File "/home/user/spark2/python/pyspark/serializers.py", line 402, in dumps
  14. return cloudpickle.dumps(obj, 2)
  15. File "/home/user/spark2/python/pyspark/cloudpickle.py", line 816, in dumps
  16. cp.dump(obj)
  17. File "/home/user/spark2/python/pyspark/cloudpickle.py", line 133, in dump
  18. return pickle.Pickler.dump(self, obj)
  19. File "/usr/lib/python2.7/pickle.py", line 224, in dump
  20. self.save(obj)
  21. File "/usr/lib/python2.7/pickle.py", line 286, in save
  22. f(self, obj) # Call unbound method with explicit self
  23. File "/usr/lib/python2.7/pickle.py", line 562, in save_tuple
  24. save(element)
  25. File "/usr/lib/python2.7/pickle.py", line 286, in save
  26. f(self, obj) # Call unbound method with explicit self
  27. File "/home/user/spark2/python/pyspark/cloudpickle.py", line 254, in save_function
  28. self.save_function_tuple(obj, [themodule])
  29. File "/home/user/spark2/python/pyspark/cloudpickle.py", line 304, in save_function_tuple
  30. save((code, closure, base_globals))
  31. File "/usr/lib/python2.7/pickle.py", line 286, in save
  32. f(self, obj) # Call unbound method with explicit self
  33. File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple
  34. save(element)
  35. File "/usr/lib/python2.7/pickle.py", line 286, in save
  36. f(self, obj) # Call unbound method with explicit self
  37. File "/usr/lib/python2.7/pickle.py", line 600, in save_list
  38. self._batch_appends(iter(obj))
  39. File "/usr/lib/python2.7/pickle.py", line 633, in _batch_appends
  40. save(x)
  41. File "/usr/lib/python2.7/pickle.py", line 286, in save
  42. f(self, obj) # Call unbound method with explicit self
  43. File "/home/user/spark2/python/pyspark/cloudpickle.py", line 254, in save_function
  44. self.save_function_tuple(obj, [themodule])
  45. File "/home/user/spark2/python/pyspark/cloudpickle.py", line 304, in save_function_tuple
  46. save((code, closure, base_globals))
  47. File "/usr/lib/python2.7/pickle.py", line 286, in save
  48. f(self, obj) # Call unbound method with explicit self
  49. File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple
  50. save(element)
  51. File "/usr/lib/python2.7/pickle.py", line 286, in save
  52. f(self, obj) # Call unbound method with explicit self
  53. File "/usr/lib/python2.7/pickle.py", line 600, in save_list
  54. self._batch_appends(iter(obj))
  55. File "/usr/lib/python2.7/pickle.py", line 633, in _batch_appends
  56. save(x)
  57. File "/usr/lib/python2.7/pickle.py", line 286, in save
  58. f(self, obj) # Call unbound method with explicit self
  59. File "/home/user/spark2/python/pyspark/cloudpickle.py", line 254, in save_function
  60. self.save_function_tuple(obj, [themodule])
  61. File "/home/user/spark2/python/pyspark/cloudpickle.py", line 304, in save_function_tuple
  62. save((code, closure, base_globals))
  63. File "/usr/lib/python2.7/pickle.py", line 286, in save
  64. f(self, obj) # Call unbound method with explicit self
  65. File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple
  66. save(element)
  67. File "/usr/lib/python2.7/pickle.py", line 286, in save
  68. f(self, obj) # Call unbound method with explicit self
  69. File "/usr/lib/python2.7/pickle.py", line 600, in save_list
  70. self._batch_appends(iter(obj))
  71. File "/usr/lib/python2.7/pickle.py", line 636, in _batch_appends
  72. save(tmp[0])
  73. File "/usr/lib/python2.7/pickle.py", line 286, in save
  74. f(self, obj) # Call unbound method with explicit self
  75. File "/home/user/spark2/python/pyspark/cloudpickle.py", line 249, in save_function
  76. self.save_function_tuple(obj, modList)
  77. File "/home/user/spark2/python/pyspark/cloudpickle.py", line 309, in save_function_tuple
  78. save(f_globals)
  79. File "/usr/lib/python2.7/pickle.py", line 286, in save
  80. f(self, obj) # Call unbound method with explicit self
  81. File "/home/user/spark2/python/pyspark/cloudpickle.py", line 174, in save_dict
  82. pickle.Pickler.save_dict(self, obj)
  83. File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
  84. self._batch_setitems(obj.iteritems())
  85. File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
  86. save(v)
  87. File "/usr/lib/python2.7/pickle.py", line 331, in save
  88. self.save_reduce(obj=obj, *rv)
  89. File "/home/user/spark2/python/pyspark/cloudpickle.py", line 650, in save_reduce
  90. save(state)
  91. File "/usr/lib/python2.7/pickle.py", line 286, in save
  92. f(self, obj) # Call unbound method with explicit self
  93. File "/home/user/spark2/python/pyspark/cloudpickle.py", line 174, in save_dict
  94. pickle.Pickler.save_dict(self, obj)
  95. File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
  96. self._batch_setitems(obj.iteritems())
  97. File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
  98. save(v)
  99. File "/usr/lib/python2.7/pickle.py", line 331, in save
  100. self.save_reduce(obj=obj, *rv)
  101. File "/home/user/spark2/python/pyspark/cloudpickle.py", line 650, in save_reduce
  102. save(state)
  103. File "/usr/lib/python2.7/pickle.py", line 286, in save
  104. f(self, obj) # Call unbound method with explicit self
  105. File "/home/user/spark2/python/pyspark/cloudpickle.py", line 174, in save_dict
  106. pickle.Pickler.save_dict(self, obj)
  107. File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
  108. self._batch_setitems(obj.iteritems())
  109. File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
  110. save(v)
  111. File "/usr/lib/python2.7/pickle.py", line 331, in save
  112. self.save_reduce(obj=obj, *rv)
  113. File "/home/user/spark2/python/pyspark/cloudpickle.py", line 650, in save_reduce
  114. save(state)
  115. File "/usr/lib/python2.7/pickle.py", line 286, in save
  116. f(self, obj) # Call unbound method with explicit self
  117. File "/home/user/spark2/python/pyspark/cloudpickle.py", line 174, in save_dict
  118. pickle.Pickler.save_dict(self, obj)
  119. File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
  120. self._batch_setitems(obj.iteritems())
  121. File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
  122. save(v)
  123. File "/usr/lib/python2.7/pickle.py", line 286, in save
  124. f(self, obj) # Call unbound method with explicit self
  125. File "/home/user/spark2/python/pyspark/cloudpickle.py", line 547, in save_inst
  126. self.save_inst_logic(obj)
  127. File "/home/user/spark2/python/pyspark/cloudpickle.py", line 537, in save_inst_logic
  128. save(stuff)
  129. File "/usr/lib/python2.7/pickle.py", line 286, in save
  130. f(self, obj) # Call unbound method with explicit self
  131. File "/home/user/spark2/python/pyspark/cloudpickle.py", line 174, in save_dict
  132. pickle.Pickler.save_dict(self, obj)
  133. File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
  134. self._batch_setitems(obj.iteritems())
  135. File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
  136. save(v)
  137. File "/usr/lib/python2.7/pickle.py", line 286, in save
  138. f(self, obj) # Call unbound method with explicit self
  139. File "/home/user/spark2/python/pyspark/cloudpickle.py", line 547, in save_inst
  140. self.save_inst_logic(obj)
  141. File "/home/user/spark2/python/pyspark/cloudpickle.py", line 537, in save_inst_logic
  142. save(stuff)
  143. File "/usr/lib/python2.7/pickle.py", line 286, in save
  144. f(self, obj) # Call unbound method with explicit self
  145. File "/home/user/spark2/python/pyspark/cloudpickle.py", line 174, in save_dict
  146. pickle.Pickler.save_dict(self, obj)
  147. File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
  148. self._batch_setitems(obj.iteritems())
  149. File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
  150. save(v)
  151. File "/usr/lib/python2.7/pickle.py", line 331, in save
  152. self.save_reduce(obj=obj, *rv)
  153. File "/home/user/spark2/python/pyspark/cloudpickle.py", line 616, in save_reduce
  154. save(cls)
  155. File "/usr/lib/python2.7/pickle.py", line 286, in save
  156. f(self, obj) # Call unbound method with explicit self
  157. File "/home/user/spark2/python/pyspark/cloudpickle.py", line 467, in save_global
  158. d),obj=obj)
  159. File "/home/user/spark2/python/pyspark/cloudpickle.py", line 631, in save_reduce
  160. save(args)
  161. File "/usr/lib/python2.7/pickle.py", line 286, in save
  162. f(self, obj) # Call unbound method with explicit self
  163. File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple
  164. save(element)
  165. File "/usr/lib/python2.7/pickle.py", line 286, in save
  166. f(self, obj) # Call unbound method with explicit self
  167. File "/home/user/spark2/python/pyspark/cloudpickle.py", line 174, in save_dict
  168. pickle.Pickler.save_dict(self, obj)
  169. File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
  170. self._batch_setitems(obj.iteritems())
  171. File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
  172. save(v)
  173. File "/usr/lib/python2.7/pickle.py", line 331, in save
  174. self.save_reduce(obj=obj, *rv)
  175. File "/home/user/spark2/python/pyspark/cloudpickle.py", line 616, in save_reduce
  176. save(cls)
  177. File "/usr/lib/python2.7/pickle.py", line 286, in save
  178. f(self, obj) # Call unbound method with explicit self
  179. File "/home/user/spark2/python/pyspark/cloudpickle.py", line 442, in save_global
  180. raise pickle.PicklingError("Can't pickle builtin %s" % obj)
  181. pickle.PicklingError: Can't pickle builtin <type 'method_descriptor'>

my update func返回类型为 (key, (value1, value2)) 它们都是字符串,如下所示:

  1. def update(doc):
  2. doc_id = doc[0][path_len:-ext_len] #actual file name
  3. content = doc[1].lower()
  4. new_fi = regex.split(content)
  5. old_fi = fi_table.row(doc_id)
  6. fi_table.put(doc_id, {'cf:col': ",".join(new_fi)})
  7. if not old_fi:
  8. return [(term, ('add', doc_id)) for term in new_fi]
  9. else:
  10. new_fi = set(new_fi)
  11. old_fi = set(old_fi['cf:col'].split(','))
  12. return [(term, ('add', doc_id)) for term in new_fi - old_fi] + \
  13. [(term, ('del', doc_id)) for term in old_fi - new_fi]

edit:问题在于这两个hbase函数,row和put。当我对它们进行注解时,这两个代码都起作用(将旧的\u fi设置为空字典),但如果其中一个运行,就会产生上述错误。我使用happybase在python中操作hbase。有人能告诉我出了什么问题吗?

wnrlj8wa

wnrlj8wa1#

如果methoddescriptortype确实存在酸洗问题,则可以注册如何酸洗methoddescriptortype,如下所示:

  1. def _getattr(objclass, name, repr_str):
  2. # hack to grab the reference directly
  3. try:
  4. attr = repr_str.split("'")[3]
  5. return eval(attr+'.__dict__["'+name+'"]')
  6. except:
  7. attr = getattr(objclass,name)
  8. if name == '__dict__':
  9. attr = attr[name]
  10. return attar
  11. def save_wrapper_descriptor(pickler, obj):
  12. pickler = Pickler(file, protocol)
  13. pickler.save_reduce(_getattr, (obj.__objclass__, obj.__name__,
  14. obj.__repr__()), obj=obj)
  15. return
  16. # register the following "type" with:
  17. # Pickler.dispatch[MethodDescriptorType] = save_wrapper_descriptor
  18. MethodDescriptorType = type(type.__dict__['mro'])

然后,如果您将上述内容注册到酸洗调度表 spark 使用(如上所示,或与 copy_reg ),它可以通过酸洗错误。

展开查看全部
ozxc1zmp

ozxc1zmp2#

spark尝试序列化connect对象,以便在执行器中使用它,这肯定会失败,因为反序列化的db connect对象无法向另一个作用域(甚至计算机)授予读/写权限。这个问题可以通过广播connect对象来重现。对于此示例,序列化i/o对象时出现问题。
通过连接Map函数内部的数据库,部分解决了这个问题。由于map函数中的每个rdd元素都有太多的连接,因此我不得不切换到分区处理,以将db连接从20k减少到8-64(基于分区的数量)。spark开发人员应该考虑为执行器创建一个初始化函数/脚本,以避免此类死角问题。
假设我让每个节点执行这个init函数,那么每个节点都将连接到数据库(一些conn池,或者单独的zookeeper节点),因为init函数和map函数将共享相同的作用域,然后问题就消失了,所以您编写的代码比我找到的解决方法更快。在执行结束时,spark将释放/卸载这些定义的变量,程序将结束。

相关问题