pyspark:由于窗口函数上的outofmemory而失败

azpvetkf  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(345)

我有一个工作的解决方案,当使用一个小的数据集,但失败的非常大的一个(>100mio记录)由于内存不足。我是新的Spark,所以不知道如何调试或在哪里寻找。我在不同的港口聚集了30分钟的时间

  1. # TCP ports
  2. ports = ['22', '25', '53', '80', '88', '123', '514', '443', '8080', '8443']
  3. def add_port_column(r_df, port, window):
  4. '''
  5. Input:
  6. r_df: dataframe
  7. port: port
  8. window: pyspark window to be used
  9. Output: pyspark dataframe
  10. '''
  11. return r_df.withColumn('pkts_src_port_{}_30m'.format(port), F.when(F.col('source_port') == port, F.sum('source_packets').over(window)).otherwise(0))\
  12. .withColumn('pkts_dst_port_{}_30m'.format(port), F.when(F.col('destination_port') == port, F.sum('destination_packets').over(window)).otherwise(0))
  13. w = (Window()
  14. .partitionBy("source_ip")
  15. .orderBy(F.col("timestamp"))
  16. .rangeBetween(-window_time, 0))
  17. flows_filtered_v3_df = (reduce(partial(add_port_column,window=w_s),
  18. ports,
  19. flows_filtered_v3_df
  20. ))

不确定这是否是确切的错误,但阅读了很多OOM

  1. [17:39:40] [INFO] [dku.utils] - at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:929)
  2. [17:39:40] [INFO] [dku.utils] - at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2111)
  3. [17:39:40] [INFO] [dku.utils] - at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2060)
  4. [17:39:40] [INFO] [dku.utils] - at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2049)
  5. [17:39:40] [INFO] [dku.utils] - at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
  6. [17:39:40] [INFO] [dku.utils] - at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:740)
  7. [17:39:40] [INFO] [dku.utils] - at org.apache.spark.SparkContext.runJob(SparkContext.scala:2081)
  8. [17:39:40] [INFO] [dku.utils] - at org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:167)
  9. [17:39:40] [INFO] [dku.utils] - at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
  10. [17:39:40] [INFO] [dku.utils] - at org.apache.spark.io.ReadAheadInputStream$1.run(ReadAheadInputStream.java:168)
  11. [17:39:40] [INFO] [dku.utils] - at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
  12. [17:39:40] [INFO] [dku.utils] - at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
  13. [17:39:40] [INFO] [dku.utils] - at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
  14. [17:39:40] [INFO] [dku.utils] - at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
  15. [17:39:40] [INFO] [dku.utils] - at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  16. [17:39:40] [INFO] [dku.utils] - at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
  17. [17:39:40] [INFO] [dku.utils] - at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
  18. [17:39:40] [INFO] [dku.utils] - at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  19. [17:39:40] [INFO] [dku.utils] - at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  20. [17:39:40] [INFO] [dku.utils] - at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  21. [17:39:40] [INFO] [dku.utils] - at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  22. [17:39:40] [INFO] [dku.utils] - at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
  23. [17:39:40] [INFO] [dku.utils] - at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
  24. [17:39:40] [INFO] [dku.utils] - at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
  25. [17:39:40] [INFO] [dku.utils] - at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
  26. [17:39:40] [INFO] [dku.utils] - at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
  27. [17:39:40] [INFO] [dku.utils] - at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
  28. [17:39:40] [INFO] [dku.utils] - at java.lang.Thread.run(Thread.java:748)
  29. [17:39:40] [INFO] [dku.utils] - at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
  30. [17:39:40] [INFO] [dku.utils] - 20/09/04 17:39:38 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM
  31. [17:39:40] [INFO] [dku.utils] - at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
  32. [17:39:40] [INFO] [dku.utils] - 20/09/04 17:39:38 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[read-ahead,5,main]
  33. [17:39:40] [INFO] [dku.utils] - at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668)
  34. [17:39:40] [INFO] [dku.utils] - java.lang.OutOfMemoryError: Java heap space
  35. [17:39:40] [INFO] [dku.utils] - at org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:422)
  36. [17:39:40] [INFO] [dku.utils] - at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276)
  37. [17:39:40] [INFO] [dku.utils] - at org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:167)
  38. [17:39:40] [INFO] [dku.utils] - at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270)
  39. [17:39:40] [INFO] [dku.utils] - at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:228)
  40. [17:39:40] [INFO] [dku.utils] - at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:557)
  41. [17:39:40] [INFO] [dku.utils] - at com.dataiku.dip.spark.ParquetWriter$.saveParquetDataset(ParquetWriter.scala:63)
  42. [17:39:40] [INFO] [dku.utils] - at com.dataiku.dip.spark.StdDataikuSparkContext.saveHDFSableWithFastPathIfPossible(StdDataikuSparkContext.scala:984)
  43. [17:39:40] [INFO] [dku.utils] - at com.dataiku.dip.spark.StdDataikuSparkContext.internalSave(StdDataikuSparkContext.scala:897)
  44. [17:39:40] [INFO] [dku.utils] - at com.dataiku.dss.spark.DataikuSparkContext$class.save(DataikuSparkContext.scala:83)
  45. [17:39:40] [INFO] [dku.utils] - at org.apache.spark.io.ReadAheadInputStream$1.run(ReadAheadInputStream.java:168)
  46. [17:39:40] [INFO] [dku.utils] - at com.dataiku.dip.spark.StdDataikuSparkContext.save(StdDataikuSparkContext.scala:60)
  47. [17:39:40] [INFO] [dku.utils] - at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  48. [17:39:40] [INFO] [dku.utils] - at com.dataiku.dip.spark.StdDataikuSparkContext.savePyDataFrame(StdDataikuSparkContext.scala:673)
  49. [17:39:40] [INFO] [dku.utils] - at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  50. [17:39:40] [INFO] [dku.utils] - at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  51. [17:39:40] [INFO] [dku.utils] - at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  52. [17:39:40] [INFO] [dku.utils] - at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  53. [17:39:40] [INFO] [dku.utils] - at java.lang.reflect.Method.invoke(Method.java:498)
  54. [17:39:40] [INFO] [dku.utils] - at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
  55. [17:39:40] [INFO] [dku.utils] - at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
  56. [17:39:40] [INFO] [dku.utils] - at py4j.Gateway.invoke(Gateway.java:282)
  57. [17:39:40] [INFO] [dku.utils] - at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
  58. [17:39:40] [INFO] [dku.utils] - at py4j.commands.CallCommand.execute(CallCommand.java:79)
  59. [17:39:40] [INFO] [dku.utils] - at py4j.GatewayConnection.run(GatewayConnection.java:238)
  60. [17:39:40] [INFO] [dku.utils] - at java.lang.Thread.run(Thread.java:748)
  61. [17:39:40] [INFO] [dku.utils] - at java.lang.Thread.run(Thread.java:748)

spark提交

  1. org.apache.spark.deploy.SparkSubmit --conf spark.executor.memory=8g --conf spark.local.dir=/pkgs/cdh/tmp/spark --conf spark.yarn.security.tokens.hive.enabled=false --conf spark.yarn.security.credentials.hadoopfs.enabled=false --conf spark.security.credentials.hive.enabled=false --conf spark.app.name=DSS (Py): compute_flows_window_pyspark_2020-04-14 --conf spark.io.compression.codec=snappy --conf spark.sql.shuffle.partitions=80 --conf spark.shuffle.spill.compress=false --conf spark.shuffle.compress=false --conf spark.dku.limitedLogs={"filePartitioner.noMatch":100,"s3.ignoredPath":100,"s3.ignoredFile":100} --conf spark.security.credentials.hadoopfs.enabled=false --conf spark.jars.repositories=https://nexus.bisinfo.org:8443/repository/maven-central --conf spark.yarn.executor.memoryOverhead=600

编辑:增加内存后,它正在抱怨

  1. [09:20:10] [INFO] [dku.utils] - at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
  2. [09:20:10] [INFO] [dku.utils] - at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
  3. [09:20:10] [INFO] [dku.utils] - at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
  4. [09:20:10] [INFO] [dku.utils] - at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
  5. [09:20:10] [INFO] [dku.utils] - at org.apache.spark.scheduler.Task.run(Task.scala:121)
  6. [09:20:10] [INFO] [dku.utils] - at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
  7. [09:20:10] [INFO] [dku.utils] - at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
  8. [09:20:10] [INFO] [dku.utils] - at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
  9. [09:20:10] [INFO] [dku.utils] - at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  10. [09:20:10] [INFO] [dku.utils] - at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  11. [09:20:10] [INFO] [dku.utils] - at java.lang.Thread.run(Thread.java:748)
  12. [09:20:10] [INFO] [dku.utils] - Caused by: java.io.IOException: PARSING_ERROR(2)
  13. [09:20:10] [INFO] [dku.utils] - at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98)
  14. [09:20:10] [INFO] [dku.utils] - at org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
  15. [09:20:10] [INFO] [dku.utils] - at org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:634)
  16. [09:20:10] [INFO] [dku.utils] - at org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:435)
  17. [09:20:10] [INFO] [dku.utils] - at org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:167)
  18. [09:20:10] [INFO] [dku.utils] - at org.apache.spark.io.ReadAheadInputStream$1.run(ReadAheadInputStream.java:168)
  19. [09:20:10] [INFO] [dku.utils] - ... 3 more

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题