for循环列表

zpqajqem  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(491)

我有两个Dataframe叫做 df1 以及 df2 ,它们都有相同的列名。我希望在独特的日期上运行for循环,从 df1 并将相同的日期筛选器应用于 df2 . 我创建了一个唯一日期的列表,然后尝试遍历它。然而,我有什么是抛出错误。
以下是我所拥有的:

  1. val unique_weeks = df1.select(df1("date")).distinct
  2. for( week <- unique_weeks) {
  3. val df1_filtered = df1.filter($"date" === week)
  4. val df2_filtered = df2.filter($"date" === week)
  5. /// will run a join here and more code
  6. }

我想 <- 这部分可能不正确-但不确定如何使用其他方法过滤Dataframe。
错误如下:

  1. [error] (run-main-0) org.apache.spark.SparkException: Job aborted due to stage failure: Task 35 in stage 3.0 failed 1 times, most recent failure: Lost task 35.0 in stage 3.0 (TID 399, localhost, executor driver): java.lang.RuntimeException: Unsupported literal type class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema [1591772400000]
  2. [error] at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:75)
  3. [error] at org.apache.spark.sql.functions$.lit(functions.scala:101)
  4. [error] at org.apache.spark.sql.Column.$eq$eq$eq(Column.scala:267)
  5. [error] at spark_pkg.SparkMain$$anonfun$main$1.apply(SparkMain.scala:880)
  6. [error] at spark_pkg.SparkMain$$anonfun$main$1.apply(SparkMain.scala:878)
  7. [error] at scala.collection.Iterator$class.foreach(Iterator.scala:893)
  8. [error] at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
  9. [error] at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917)
  10. [error] at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917)
  11. [error] at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
  12. [error] at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
  13. [error] at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
  14. [error] at org.apache.spark.scheduler.Task.run(Task.scala:99)
  15. [error] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
  16. [error] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  17. [error] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  18. [error] at java.lang.Thread.run(Thread.java:748)
  19. [error]
  20. [error] Driver stacktrace:
  21. [error] org.apache.spark.SparkException: Job aborted due to stage failure: Task 35 in stage 3.0 failed 1 times, most recent failure: Lost task 35.0 in stage 3.0 (TID 399, localhost, executor driver): java.lang.RuntimeException: Unsupported literal type class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema [1591772400000]
  22. [error] at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:75)
  23. [error] at org.apache.spark.sql.functions$.lit(functions.scala:101)
  24. [error] at org.apache.spark.sql.Column.$eq$eq$eq(Column.scala:267)
  25. [error] at spark_pkg.SparkMain$$anonfun$main$1.apply(SparkMain.scala:880)
  26. [error] at spark_pkg.SparkMain$$anonfun$main$1.apply(SparkMain.scala:878)
  27. [error] at scala.collection.Iterator$class.foreach(Iterator.scala:893)
  28. [error] at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
  29. [error] at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917)
  30. [error] at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917)
  31. [error] at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
  32. [error] at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
  33. [error] at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
  34. [error] at org.apache.spark.scheduler.Task.run(Task.scala:99)
  35. [error] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
  36. [error] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  37. [error] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  38. [error] at java.lang.Thread.run(Thread.java:748)
  39. [error]
  40. [error] Driver stacktrace:
  41. [error] at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
  42. [error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
  43. [error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
  44. [error] at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  45. [error] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  46. [error] at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
  47. [error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
  48. [error] at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
  49. [error] at scala.Option.foreach(Option.scala:257)
  50. [error] at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
  51. [error] at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
  52. [error] at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
  53. [error] at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
  54. [error] at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  55. [error] at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
  56. [error] at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
  57. [error] at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
  58. [error] at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
  59. [error] at org.apache.spark.SparkContext.runJob(SparkContext.scala:1958)
  60. [error] at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:917)
  61. [error] at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:915)
  62. [error] at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  63. [error] at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  64. [error] at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  65. [error] at org.apache.spark.rdd.RDD.foreach(RDD.scala:915)
  66. [error] at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply$mcV$sp(Dataset.scala:2286)
  67. [error] at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply(Dataset.scala:2286)
  68. [error] at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply(Dataset.scala:2286)
  69. [error] at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
  70. [error] at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2765)
  71. [error] at org.apache.spark.sql.Dataset.foreach(Dataset.scala:2285)
  72. [error] at spark_pkg.SparkMain$.main(SparkMain.scala:878)
  73. [error] at spark_pkg.SparkMain.main(SparkMain.scala)
  74. [error] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  75. [error] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  76. [error] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  77. [error] at java.lang.reflect.Method.invoke(Method.java:498)
  78. [error] Caused by: java.lang.RuntimeException: Unsupported literal type class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema [1591772400000]
  79. [error] at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:75)
  80. [error] at org.apache.spark.sql.functions$.lit(functions.scala:101)
  81. [error] at org.apache.spark.sql.Column.$eq$eq$eq(Column.scala:267)
  82. [error] at spark_pkg.SparkMain$$anonfun$main$1.apply(SparkMain.scala:880)
  83. [error] at spark_pkg.SparkMain$$anonfun$main$1.apply(SparkMain.scala:878)
  84. [error] at scala.collection.Iterator$class.foreach(Iterator.scala:893)
  85. [error] at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
  86. [error] at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917)
  87. [error] at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917)
  88. [error] at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
  89. [error] at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
  90. [error] at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
  91. [error] at org.apache.spark.scheduler.Task.run(Task.scala:99)
  92. [error] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
  93. [error] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  94. [error] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  95. [error] at java.lang.Thread.run(Thread.java:748)
  96. [error] stack trace is suppressed; run 'last Compile / bgRun' for the full output
  97. [error] Nonzero exit code: 1
  98. [error] (Compile / run) Nonzero exit code: 1
  99. [error] Total time: 137 s (02:17), completed Aug 20, 2020 1:16:02 PM
w51jfk4q

w51jfk4q1#

Dataframe不是迭代器,因此不能在其上运行for循环。您可以运行这样的程序—但我不认为它能实现您希望基于其他代码实现的功能。

  1. unique_weeks.foreachPartition{ weeks : Iterator[YourData] =>
  2. for( week <- weeks) {
  3. }
  4. }

你的问题表明你关于什么是Dataframe以及spark是如何工作的心智模型并不完全。把Dataframe看作 List[List[YourData]] ,除了每个内部 List[YourData] 位于机器的独立部件上,不一定知道或与任何其他部件相互作用 List 直到你把它们拿回来交给司机。

相关问题