在创建Dataframe时,我得到了“nullpointer exception”

aij0ehis  于 2021-07-14  发布在  Spark
关注(0)|答案(0)|浏览(364)

我正在从另一个Dataframe行创建Dataframe。这意味着我有另一个Dataframe,在这里被命名为file\u details。该Dataframe包含此格式的行[“srcpath”,“table\u name”,“primary\u key”,“type\u of \u file”,“separator”]。我正在创建一个函数。该函数逐个获取Dataframe行。在那里,我正在检查文件的类型。如果它是csv,使用spark.read选项,我有该csv文件的src\u路径和分隔符。我创建一个Dataframe,并将该Dataframe写入我的sql server.target表名,主键也是我在一个Dataframe行中得到的,在上面我已经写了行结构。
这是我写的代码。

  1. import org.apache.spark.sql.types._
  2. import org.apache.spark.sql.SparkSession
  3. import org.apache.spark.sql.Row
  4. object fixedlength {
  5. def main(args: Array[String]): Unit = {
  6. val spark = SparkSession
  7. .builder()
  8. .appName("fixedlength")
  9. .master("local")
  10. .getOrCreate()
  11. val file_details=spark.read.option("header", "true").csv("////")
  12. def fun (ro:Row) : Unit= { //Row like that ("srcpath","table_name","primary_key","type_of_file","separator")
  13. val url="jdbc:mysql://*****:3306/mydb1"
  14. val typ=ro.getString(3)
  15. if(typ=="csv")
  16. {
  17. val sep=ro.getString(4);
  18. val src_path=ro.getString(0);
  19. val tgt_table=ro.getString(1);
  20. val df1=spark.read.option("header", "true").option("delimiter",sep).csv(src_path)
  21. df1.write.format("jdbc")
  22. .mode("overwrite")
  23. .option("url",url)
  24. .option("driver","com.mysql.jdbc.Driver")
  25. .option("dbtable",tgt_table)
  26. .option("user","username")
  27. .option("password","password")
  28. .save()
  29. }
  30. }
  31. val ne=file_details.select("*").rdd.map(x=>fun(x)).collect()
  32. }
  33. }

我犯了个错误,

  1. ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 7)
  2. java.lang.NullPointerException
  3. at fixedlength$.fixedlength$$fun$1(fixedlength.scala:66)
  4. at fixedlength$$anonfun$10.apply(fixedlength.scala:106)
  5. at fixedlength$$anonfun$10.apply(fixedlength.scala:106)
  6. at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
  7. at scala.collection.Iterator$class.foreach(Iterator.scala:891)
  8. at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
  9. at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
  10. at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
  11. at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
  12. at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
  13. at scala.collection.AbstractIterator.to(Iterator.scala:1334)
  14. at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
  15. at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334)
  16. at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
  17. at scala.collection.AbstractIterator.toArray(Iterator.scala:1334)
  18. at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
  19. at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
  20. at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
  21. at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
  22. at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
  23. at org.apache.spark.scheduler.Task.run(Task.scala:121)
  24. at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
  25. at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
  26. at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
  27. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  28. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  29. at java.lang.Thread.run(Thread.java:748)
  30. 21/04/12 11:11:09 WARN TaskSetManager: Lost task 0.0 in stage 7.0 (TID 7, localhost, executor driver): java.lang.NullPointerException
  31. at fixedlength$.fixedlength$$fun$1(fixedlength.scala:66)
  32. at fixedlength$$anonfun$10.apply(fixedlength.scala:106)
  33. at fixedlength$$anonfun$10.apply(fixedlength.scala:106)
  34. at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
  35. at scala.collection.Iterator$class.foreach(Iterator.scala:891)
  36. at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
  37. at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
  38. at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
  39. at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
  40. at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
  41. at scala.collection.AbstractIterator.to(Iterator.scala:1334)
  42. at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
  43. at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334)
  44. at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
  45. at scala.collection.AbstractIterator.toArray(Iterator.scala:1334)
  46. at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
  47. at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
  48. at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
  49. at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
  50. at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
  51. at org.apache.spark.scheduler.Task.run(Task.scala:121)
  52. at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
  53. at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
  54. at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
  55. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  56. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  57. at java.lang.Thread.run(Thread.java:748)
  58. 21/04/12 11:11:09 ERROR TaskSetManager: Task 0 in stage 7.0 failed 1 times; aborting job
  59. 21/04/12 11:11:09 INFO TaskSchedulerImpl: Removed TaskSet 7.0, whose tasks have all completed, from pool
  60. 21/04/12 11:11:09 INFO TaskSchedulerImpl: Cancelling stage 7
  61. 21/04/12 11:11:09 INFO TaskSchedulerImpl: Killing all running tasks in stage 7: Stage cancelled
  62. 21/04/12 11:11:09 INFO DAGScheduler: ResultStage 7 (collect at fixedlength.scala:106) failed in 0.156 s due to Job aborted due to stage failure: Task 0 in stage 7.0 failed 1 times, most recent failure: Lost task 0.0 in stage 7.0 (TID 7, localhost, executor driver): java.lang.NullPointerException
  63. at fixedlength$.fixedlength$$fun$1(fixedlength.scala:66)
  64. at fixedlength$$anonfun$10.apply(fixedlength.scala:106)
  65. at fixedlength$$anonfun$10.apply(fixedlength.scala:106)
  66. at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
  67. at scala.collection.Iterator$class.foreach(Iterator.scala:891)
  68. at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
  69. at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
  70. at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
  71. at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
  72. at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
  73. at scala.collection.AbstractIterator.to(Iterator.scala:1334)
  74. at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
  75. at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334)
  76. at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
  77. at scala.collection.AbstractIterator.toArray(Iterator.scala:1334)
  78. at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
  79. at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
  80. at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
  81. at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
  82. at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
  83. at org.apache.spark.scheduler.Task.run(Task.scala:121)
  84. at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
  85. at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
  86. at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
  87. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  88. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  89. at java.lang.Thread.run(Thread.java:748)
  90. Driver stacktrace:
  91. 21/04/12 11:11:09 INFO DAGScheduler: Job 7 failed: collect at fixedlength.scala:106, took 0.164149 s
  92. Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 1 times, most recent failure: Lost task 0.0 in stage 7.0 (TID 7, localhost, executor driver): java.lang.NullPointerException
  93. at fixedlength$.fixedlength$$fun$1(fixedlength.scala:66)
  94. at fixedlength$$anonfun$10.apply(fixedlength.scala:106)
  95. at fixedlength$$anonfun$10.apply(fixedlength.scala:106)
  96. at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
  97. at scala.collection.Iterator$class.foreach(Iterator.scala:891)
  98. at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
  99. at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
  100. at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
  101. at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
  102. at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
  103. at scala.collection.AbstractIterator.to(Iterator.scala:1334)
  104. at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
  105. at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334)
  106. at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
  107. at scala.collection.AbstractIterator.toArray(Iterator.scala:1334)
  108. at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
  109. at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
  110. at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
  111. at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
  112. at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
  113. at org.apache.spark.scheduler.Task.run(Task.scala:121)
  114. at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
  115. at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
  116. at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
  117. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  118. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  119. at java.lang.Thread.run(Thread.java:748)
  120. Driver stacktrace:
  121. at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887)
  122. at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1875)
  123. at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1874)
  124. at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  125. at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  126. at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
  127. at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
  128. at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
  129. at scala.Option.foreach(Option.scala:257)
  130. at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
  131. at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
  132. at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
  133. at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
  134. at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
  135. at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
  136. at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
  137. at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
  138. at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
  139. at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
  140. at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
  141. at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  142. at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  143. at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
  144. at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
  145. at fixedlength$.main(fixedlength.scala:106)
  146. at fixedlength.main(fixedlength.scala)
  147. Caused by: java.lang.NullPointerException
  148. at fixedlength$.fixedlength$$fun$1(fixedlength.scala:66)
  149. at fixedlength$$anonfun$10.apply(fixedlength.scala:106)
  150. at fixedlength$$anonfun$10.apply(fixedlength.scala:106)
  151. at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
  152. at scala.collection.Iterator$class.foreach(Iterator.scala:891)
  153. at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
  154. at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
  155. at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
  156. at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
  157. at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
  158. at scala.collection.AbstractIterator.to(Iterator.scala:1334)
  159. at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
  160. at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334)
  161. at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
  162. at scala.collection.AbstractIterator.toArray(Iterator.scala:1334)
  163. at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
  164. at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
  165. at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
  166. at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
  167. at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
  168. at org.apache.spark.scheduler.Task.run(Task.scala:121)
  169. at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
  170. at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
  171. at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
  172. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  173. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  174. at java.lang.Thread.run(Thread.java:748)
  175. 21/04/12 11:11:09 INFO SparkContext: Invoking stop() from shutdown hook
  176. 21/04/12 11:11:09 INFO SparkUI: Stopped Spark web UI at http://LTIN236281.cts.com:4040
  177. 21/04/12 11:11:09 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
  178. 21/04/12 11:11:10 INFO MemoryStore: MemoryStore cleared
  179. 21/04/12 11:11:10 INFO BlockManager: BlockManager stopped
  180. 21/04/12 11:11:10 INFO BlockManagerMaster: BlockManagerMaster stopped
  181. 21/04/12 11:11:10 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
  182. 21/04/12 11:11:10 INFO SparkContext: Successfully stopped SparkContext
  183. 21/04/12 11:11:10 INFO ShutdownHookManager: Shutdown hook called
  184. 21/04/12 11:11:10 INFO ShutdownHookManager: Deleting directory C:\Users\844605\AppData\Local\Temp\spark-cae157ad-b63e-43b4-831e-9cc61094b4db

我在internet上搜索了那个错误“空指针异常”,我在某个地方发现,当工作节点试图访问spark上下文对象时,会抛出这个错误。
有什么不同的方法来实现这个目标吗?
你能请任何人帮我摆脱这个错误吗?
我的案例研究是,我有一个数据框,它包含文件的src\u路径,文件的类型\u,如果它是该文件的csv分隔符,则是sql中的目标表。我将这些行传递到我创建的一个函数中,它获取这些参数的基础是它创建dataframe并将该dataframe推送到sql目标表中。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题