pyspark 如何在Apache Spark中对整数列表进行排序?

lsmepo6l  于 2023-08-02  发布在  Spark
关注(0)|答案(2)|浏览(160)

最近,我开始使用Apache Spark对大量数据进行排序。
在我最初的测试中,我试图在PySpark上并行排序一个整数列表,但显然在使用**sortByKey()**方法时发生了错误。

  1. >>> rdd = sc.parallelize([5,3,4,7,6,9])
  2. >>> rdd.sortByKey(True)
  3. 2018-09-02 10:39:42 ERROR Executor:91 - Exception in task 2.0 in stage 1.0 (TID 6)
  4. org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  5. File "C:\spark-2.3.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 230, in main
  6. File "C:\spark-2.3.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 225, in process
  7. File "C:\spark-2.3.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 372, in dump_stream
  8. vs = list(itertools.islice(iterator, batch))
  9. File "C:\spark-2.3.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\util.py", line 55, in wrapper
  10. return f(*args, **kwargs)
  11. File "C:\spark-2.3.1-bin-hadoop2.7\python\pyspark\rdd.py", line 690, in <lambda>
  12. samples = self.sample(False, fraction, 1).map(lambda kv: kv[0]).collect()
  13. TypeError: 'int' object has no attribute '__getitem__'
  14. at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
  15. at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
  16. at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
  17. at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
  18. at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
  19. at scala.collection.Iterator$class.foreach(Iterator.scala:893)
  20. at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
  21. at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
  22. at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
  23. at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
  24. at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
  25. at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
  26. at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
  27. at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
  28. at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
  29. at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
  30. at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
  31. at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
  32. at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
  33. at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
  34. at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
  35. at org.apache.spark.scheduler.Task.run(Task.scala:109)
  36. at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
  37. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  38. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  39. at java.lang.Thread.run(Thread.java:748)
  40. 2018-09-02 10:39:42 WARN TaskSetManager:66 - Lost task 2.0 in stage 1.0 (TID 6, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  41. File "C:\spark-2.3.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 230, in main
  42. File "C:\spark-2.3.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 225, in process
  43. File "C:\spark-2.3.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 372, in dump_stream
  44. vs = list(itertools.islice(iterator, batch))
  45. File "C:\spark-2.3.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\util.py", line 55, in wrapper
  46. return f(*args, **kwargs)
  47. File "C:\spark-2.3.1-bin-hadoop2.7\python\pyspark\rdd.py", line 690, in <lambda>
  48. samples = self.sample(False, fraction, 1).map(lambda kv: kv[0]).collect()
  49. TypeError: 'int' object has no attribute '__getitem__'
  50. at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
  51. at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
  52. at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
  53. at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
  54. at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
  55. at scala.collection.Iterator$class.foreach(Iterator.scala:893)
  56. at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
  57. at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
  58. at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
  59. at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
  60. at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
  61. at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
  62. at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
  63. at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
  64. at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
  65. at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
  66. at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
  67. at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
  68. at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
  69. at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
  70. at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
  71. at org.apache.spark.scheduler.Task.run(Task.scala:109)
  72. at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
  73. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  74. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  75. at java.lang.Thread.run(Thread.java:748)
  76. 2018-09-02 10:39:42 ERROR TaskSetManager:70 - Task 2 in stage 1.0 failed 1 times; aborting job
  77. Traceback (most recent call last):
  78. File "<stdin>", line 1, in <module>
  79. File "C:\spark-2.3.1-bin-hadoop2.7\python\pyspark\rdd.py", line 690, in sortByKey
  80. samples = self.sample(False, fraction, 1).map(lambda kv: kv[0]).collect()
  81. File "C:\spark-2.3.1-bin-hadoop2.7\python\pyspark\rdd.py", line 834, in collect
  82. sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
  83. File "C:\spark-2.3.1-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\java_gateway.py", line 1257, in __call__
  84. File "C:\spark-2.3.1-bin-hadoop2.7\python\pyspark\sql\utils.py", line 63, in deco
  85. return f(*a, **kw)
  86. File "C:\spark-2.3.1-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\protocol.py", line 328, in get_return_value
  87. py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
  88. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 1.0 failed 1 times, most recent failure: Lost task 2.0 in stage 1.0 (TID 6, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  89. File "C:\spark-2.3.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 230, in main
  90. File "C:\spark-2.3.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 225, in process
  91. File "C:\spark-2.3.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 372, in dump_stream
  92. vs = list(itertools.islice(iterator, batch))
  93. File "C:\spark-2.3.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\util.py", line 55, in wrapper
  94. return f(*args, **kwargs)
  95. File "C:\spark-2.3.1-bin-hadoop2.7\python\pyspark\rdd.py", line 690, in <lambda>
  96. samples = self.sample(False, fraction, 1).map(lambda kv: kv[0]).collect()
  97. TypeError: 'int' object has no attribute '__getitem__'
  98. at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
  99. at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
  100. at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
  101. at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
  102. at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
  103. at scala.collection.Iterator$class.foreach(Iterator.scala:893)
  104. at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
  105. at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
  106. at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
  107. at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
  108. at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
  109. at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
  110. at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
  111. at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
  112. at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
  113. at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
  114. at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
  115. at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
  116. at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
  117. at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
  118. at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
  119. at org.apache.spark.scheduler.Task.run(Task.scala:109)
  120. at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
  121. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  122. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  123. at java.lang.Thread.run(Thread.java:748)
  124. Driver stacktrace:
  125. at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602)
  126. at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590)
  127. at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589)
  128. at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  129. at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  130. at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589)
  131. at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
  132. at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
  133. at scala.Option.foreach(Option.scala:257)
  134. at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
  135. at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823)
  136. at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772)
  137. at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761)
  138. at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  139. at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
  140. at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
  141. at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
  142. at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
  143. at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
  144. at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:939)
  145. at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  146. at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  147. at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
  148. at org.apache.spark.rdd.RDD.collect(RDD.scala:938)
  149. at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:162)
  150. at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
  151. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  152. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  153. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  154. at java.lang.reflect.Method.invoke(Method.java:498)
  155. at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
  156. at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
  157. at py4j.Gateway.invoke(Gateway.java:282)
  158. at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
  159. at py4j.commands.CallCommand.execute(CallCommand.java:79)
  160. at py4j.GatewayConnection.run(GatewayConnection.java:238)
  161. at java.lang.Thread.run(Thread.java:748)
  162. Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  163. File "C:\spark-2.3.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 230, in main
  164. File "C:\spark-2.3.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 225, in process
  165. File "C:\spark-2.3.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 372, in dump_stream
  166. vs = list(itertools.islice(iterator, batch))
  167. File "C:\spark-2.3.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\util.py", line 55, in wrapper
  168. return f(*args, **kwargs)
  169. File "C:\spark-2.3.1-bin-hadoop2.7\python\pyspark\rdd.py", line 690, in <lambda>
  170. samples = self.sample(False, fraction, 1).map(lambda kv: kv[0]).collect()
  171. TypeError: 'int' object has no attribute '__getitem__'
  172. at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
  173. at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
  174. at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
  175. at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
  176. at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
  177. at scala.collection.Iterator$class.foreach(Iterator.scala:893)
  178. at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
  179. at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
  180. at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
  181. at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
  182. at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
  183. at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
  184. at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
  185. at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
  186. at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
  187. at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
  188. at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
  189. at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
  190. at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
  191. at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
  192. at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
  193. at org.apache.spark.scheduler.Task.run(Task.scala:109)
  194. at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
  195. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  196. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  197. ... 1 more
  198. >>> 2018-09-02 10:39:42 WARN TaskSetManager:66 - Lost task 0.0 in stage 1.0 (TID 4, localhost, executor driver): TaskKilled (Stage cancelled)
  199. 2018-09-02 10:39:42 WARN TaskSetManager:66 - Lost task 3.0 in stage 1.0 (TID 7, localhost, executor driver): TaskKilled (Stage cancelled)
  200. 2018-09-02 10:39:43 WARN TaskSetManager:66 - Lost task 1.0 in stage 1.0 (TID 5, localhost, executor driver): TaskKilled (Stage cancelled)

字符串
如何使用Apache Spark并行排序整数列表?
我正在使用Windows 10、Java SE Development Kit 8和Python 2.7.15

r7knjye2

r7knjye21#

您要查找的方法是sortBy,而不是sortByKey

  1. rdd.sortBy(lambda x: x)

字符串

kq0g1dla

kq0g1dla2#

这里不能真正使用sortByKey,因为它是只在PairRDD上实现的方法。唉,您的RDD不是PairRDD,也就是说,不是键/值对的分布式集合。
你可以使用 sortBy(function) 来代替,在这里你必须提供排序条件。对于您的情况,您应该尝试:

  1. rdd.sortBy(lambda x => x)

字符串

相关问题