无法使用pyspark将Dataframe保存到parquet

sauutmhj  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(714)

我有一个由pysparksql查询生成的Dataframe,当我试图使用以下代码保存它时

  1. mydata.write.mode('overwrite').parquet("file:///mypath/master")

我发现错误附在最后。
由于Dataframe包含结构类型(使用mydata.printschema):

  1. result: struct (nullable = true)
  2. | |-- mycol1: array (nullable = true)
  3. | | |-- element: string (containsNull = true)
  4. | |-- mycol2: array (nullable = true)
  5. | | |-- element: array (containsNull = true)
  6. | | | |-- element: string (containsNull = true)

不确定这是否与数据类型、连接或其他内容有关,是否有任何方法可以帮助解决此问题?谢谢您!

  1. Py4JJavaError: An error occurred while calling o658.parquet.
  2. : org.apache.spark.SparkException: Job aborted.
  3. at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
  4. at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
  5. at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
  6. at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
  7. at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
  8. at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
  9. at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
  10. at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  11. at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  12. at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  13. at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
  14. at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
  15. at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
  16. at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:677)
  17. at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:677)
  18. at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
  19. at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
  20. at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
  21. at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:677)
  22. at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:286)
  23. at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:272)
  24. at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:230)
  25. at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:567)
  26. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  27. at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
  28. at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
  29. at java.lang.reflect.Method.invoke(Unknown Source)
  30. at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
  31. at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
  32. at py4j.Gateway.invoke(Gateway.java:282)
  33. at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
  34. at py4j.commands.CallCommand.execute(CallCommand.java:79)
  35. at py4j.GatewayConnection.run(GatewayConnection.java:238)
  36. at java.lang.Thread.run(Unknown Source)
  37. Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 52.0 failed 1 times, most recent failure: Lost task 0.0 in stage 52.0 (TID 1514, localhost, executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
  38. at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:170)
  39. at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:97)
  40. at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)
  41. at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:109)
  42. at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:81)
  43. at org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:127)
  44. at org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:89)
  45. at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:823)
  46. at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:823)
  47. at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  48. at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
  49. at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
  50. at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  51. at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
  52. at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
  53. at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  54. at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
  55. at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
  56. at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
  57. at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
  58. at org.apache.spark.scheduler.Task.run(Task.scala:123)
  59. at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
  60. at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
  61. at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
  62. at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
  63. at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
  64. at java.lang.Thread.run(Unknown Source)
  65. Caused by: java.net.SocketTimeoutException: Accept timed out
  66. at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
  67. at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
  68. at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
  69. at java.net.PlainSocketImpl.accept(Unknown Source)
  70. at java.net.ServerSocket.implAccept(Unknown Source)
  71. at java.net.ServerSocket.accept(Unknown Source)
  72. at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:164)
  73. 26 more
  74. Driver stacktrace:
  75. at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
  76. at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
  77. at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
  78. at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  79. at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  80. at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
  81. at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
  82. at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
  83. at scala.Option.foreach(Option.scala:257)
  84. at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
  85. at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
  86. at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
  87. at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
  88. at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
  89. at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
  90. at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
  91. at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
  92. 33 more
  93. Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
  94. at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:170)
  95. at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:97)
  96. at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)
  97. at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:109)
  98. at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:81)
  99. at org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:127)
  100. at org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:89)
  101. at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:823)
  102. at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:823)
  103. at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  104. at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
  105. at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
  106. at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  107. at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
  108. at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
  109. at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  110. at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
  111. at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
  112. at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
  113. at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
  114. at org.apache.spark.scheduler.Task.run(Task.scala:123)
  115. at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
  116. at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
  117. at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
  118. at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
  119. at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
  120. 1 more
  121. Caused by: java.net.SocketTimeoutException: Accept timed out
  122. at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
  123. at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
  124. at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
  125. at java.net.PlainSocketImpl.accept(Unknown Source)
  126. at java.net.ServerSocket.implAccept(Unknown Source)
  127. at java.net.ServerSocket.accept(Unknown Source)
  128. at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:164)
  129. 26 more
inb24sb2

inb24sb21#

希望这有帮助

  1. df.write.option("schema",write_schema).parquet(savepath)
0x6upsns

0x6upsns2#

我已经找出了这个问题的原因,它源于jvm内存设置。遵循本文的过程可以帮助配置jvm的设置和所有工作
64位java不会分配超过2gb的堆内存

相关问题