将spark作业提交到远程服务器时发生filenotfoundexception

nukf8bse  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(717)

我已经创建了一个环境,在这里我设置了3个docker容器,1个用于气流,使用puckel/docker气流图像,另外还安装了spark和hadoop。其他两个容器基本上是模仿spark master和worker(使用gettyimages/spark docker image来创建这个)。所有3个容器都通过网桥网络相互连接,因此所有容器都能够相互通信。
我接下来要做的是将spark作业从气流容器提交到spark集群(master)。
作为一个初步的例子,我使用 wordcount 示例脚本。我创造了一个 sample.txt 气流容器中路径处的文件 usr/local/airflow/sample.txt . 我已经猛击了气流容器,我正在使用下面给出的命令来运行 wordcount.py 我在检查网桥网络后发现的位于ip的spark master上。 spark-submit --master spark://ipaddress:7077 --files usr/local/airflow/sample.txt /opt/spark-2.4.1/examples/src/main/python/wordcount.py sample.txt 在提交脚本之后,从日志中,我可以看到已经与主服务器建立了连接(从airflow容器),并且它还复制了 --files 对主人和工人来说,但它只是说, java.io.FileNotFoundException: File file:/usr/local/airflow/sample.txt does not exist 根据我的理解(可能是错误的),但是当我们指定要使用 --files 您可以通过文件名直接访问它( sample.txt 就我而言)。所以我想弄清楚的是,如果一个作业已经提交,而文件已经复制到master,那么它为什么要在这个位置进行搜索呢 file:/usr/local/airflow/sample.txt ? 如何使它指向正确的路径?
我很抱歉,因为这个问题已经被问了好几次,但我已经阅读了所有有关stackoverflow的问题,但我仍然无法解决这个问题。我非常感谢你们的帮助。
谢谢。
下面的完整日志,

  1. user@machine:/usr/local/airflow# spark-submit --master spark://172.22.0.2:7077 --files sample.txt /opt/spark-2.4.1/examples/src/main/python/wordcount.py ./sample.txt
  2. 20/07/25 03:23:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
  3. Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
  4. 20/07/25 03:23:35 INFO SparkContext: Running Spark version 2.4.1
  5. 20/07/25 03:23:35 INFO SparkContext: Submitted application: PythonWordCount
  6. 20/07/25 03:23:35 INFO SecurityManager: Changing view acls to: root
  7. 20/07/25 03:23:35 INFO SecurityManager: Changing modify acls to: root
  8. 20/07/25 03:23:35 INFO SecurityManager: Changing view acls groups to:
  9. 20/07/25 03:23:35 INFO SecurityManager: Changing modify acls groups to:
  10. 20/07/25 03:23:35 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set()
  11. 20/07/25 03:23:35 INFO Utils: Successfully started service 'sparkDriver' on port 33457.
  12. 20/07/25 03:23:35 INFO SparkEnv: Registering MapOutputTracker
  13. 20/07/25 03:23:36 INFO SparkEnv: Registering BlockManagerMaster
  14. 20/07/25 03:23:36 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
  15. 20/07/25 03:23:36 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
  16. 20/07/25 03:23:36 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-dd1957de-6907-484d-a3d8-2b3b88e0c7ca
  17. 20/07/25 03:23:36 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
  18. 20/07/25 03:23:36 INFO SparkEnv: Registering OutputCommitCoordinator
  19. 20/07/25 03:23:36 INFO Utils: Successfully started service 'SparkUI' on port 4040.
  20. 20/07/25 03:23:36 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://0508a77fcaad:4040
  21. 20/07/25 03:23:37 INFO SparkContext: Added file file:///usr/local/airflow/sample.txt at spark://0508a77fcaad:33457/files/sample.txt with timestamp 1595647417081
  22. 20/07/25 03:23:37 INFO Utils: Copying /usr/local/airflow/sample.txt to /tmp/spark-f9dfe6ee-22d7-4747-beab-9450fc1afce0/userFiles-74f8cfe4-8a19-4d2e-8fa1-1f0bd1f0ef12/sample.txt
  23. 20/07/25 03:23:37 INFO StandaloneAppClient$ClientEndpoint: Connecting to master spark://172.22.0.2:7077...
  24. 20/07/25 03:23:37 INFO TransportClientFactory: Successfully created connection to /172.22.0.2:7077 after 32 ms (0 ms spent in bootstraps)
  25. 20/07/25 03:23:38 INFO StandaloneSchedulerBackend: Connected to Spark cluster with app ID app-20200725032338-0003
  26. 20/07/25 03:23:38 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 45057.
  27. 20/07/25 03:23:38 INFO NettyBlockTransferService: Server created on 0508a77fcaad:45057
  28. 20/07/25 03:23:38 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
  29. 20/07/25 03:23:38 INFO StandaloneAppClient$ClientEndpoint: Executor added: app-20200725032338-0003/0 on worker-20200725025003-172.22.0.4-8881 (172.22.0.4:8881) with 2 core(s)
  30. 20/07/25 03:23:38 INFO StandaloneSchedulerBackend: Granted executor ID app-20200725032338-0003/0 on hostPort 172.22.0.4:8881 with 2 core(s), 1024.0 MB RAM
  31. 20/07/25 03:23:38 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 0508a77fcaad, 45057, None)
  32. 20/07/25 03:23:38 INFO BlockManagerMasterEndpoint: Registering block manager 0508a77fcaad:45057 with 366.3 MB RAM, BlockManagerId(driver, 0508a77fcaad, 45057, None)
  33. 20/07/25 03:23:38 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 0508a77fcaad, 45057, None)
  34. 20/07/25 03:23:38 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 0508a77fcaad, 45057, None)
  35. 20/07/25 03:23:38 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20200725032338-0003/0 is now RUNNING
  36. 20/07/25 03:23:38 INFO StandaloneSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.020/07/25 03:23:38 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/usr/local/airflow/spark-warehouse').
  37. 20/07/25 03:23:38 INFO SharedState: Warehouse path is 'file:/usr/local/airflow/spark-warehouse'.
  38. 20/07/25 03:23:40 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
  39. 20/07/25 03:23:47 INFO FileSourceStrategy: Pruning directories with:
  40. 20/07/25 03:23:47 INFO FileSourceStrategy: Post-Scan Filters:
  41. 20/07/25 03:23:47 INFO FileSourceStrategy: Output Data Schema: struct<value: string>
  42. 20/07/25 03:23:47 INFO FileSourceScanExec: Pushed Filters:
  43. 20/07/25 03:23:51 INFO CodeGenerator: Code generated in 2187.926234 ms
  44. 20/07/25 03:23:53 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 220.9 KB, free 366.1 MB)
  45. 20/07/25 03:23:55 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 20.8 KB, free 366.1 MB)
  46. 20/07/25 03:23:55 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 0508a77fcaad:45057 (size: 20.8 KB, free: 366.3 MB)
  47. 20/07/25 03:23:55 INFO SparkContext: Created broadcast 0 from javaToPython at NativeMethodAccessorImpl.java:0
  48. 20/07/25 03:23:55 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes.
  49. 20/07/25 03:23:57 INFO SparkContext: Starting job: collect at /opt/spark-2.4.1/examples/src/main/python/wordcount.py:40
  50. 20/07/25 03:23:58 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (172.22.0.4:59324) with ID 0
  51. 20/07/25 03:23:58 INFO DAGScheduler: Registering RDD 5 (reduceByKey at /opt/spark-2.4.1/examples/src/main/python/wordcount.py:39)
  52. 20/07/25 03:23:58 INFO DAGScheduler: Got job 0 (collect at /opt/spark-2.4.1/examples/src/main/python/wordcount.py:40) with 1 output partitions
  53. 20/07/25 03:23:58 INFO DAGScheduler: Final stage: ResultStage 1 (collect at /opt/spark-2.4.1/examples/src/main/python/wordcount.py:40)
  54. 20/07/25 03:23:58 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
  55. 20/07/25 03:23:58 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)
  56. 20/07/25 03:23:58 INFO DAGScheduler: Submitting ShuffleMapStage 0 (PairwiseRDD[5] at reduceByKey at /opt/spark-2.4.1/examples/src/main/python/wordcount.py:39), which has no missing parents
  57. 20/07/25 03:23:58 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 15.2 KB, free 366.0 MB)
  58. 20/07/25 03:23:58 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 9.1 KB, free 366.0 MB)
  59. 20/07/25 03:23:58 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 0508a77fcaad:45057 (size: 9.1 KB, free: 366.3 MB)
  60. 20/07/25 03:23:58 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1161
  61. 20/07/25 03:23:58 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (PairwiseRDD[5] at reduceByKey at /opt/spark-2.4.1/examples/src/main/python/wordcount.py:39) (first 15 tasks are for partitions Vector(0))
  62. 20/07/25 03:23:58 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
  63. 20/07/25 03:23:58 INFO BlockManagerMasterEndpoint: Registering block manager 172.22.0.4:45435 with 366.3 MB RAM, BlockManagerId(0, 172.22.0.4, 45435, None)
  64. 20/07/25 03:23:58 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 172.22.0.4, executor 0, partition 0, PROCESS_LOCAL, 8307 bytes)
  65. 20/07/25 03:24:03 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 172.22.0.4:45435 (size: 9.1 KB, free: 366.3 MB)
  66. 20/07/25 03:24:09 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 172.22.0.4:45435 (size: 20.8 KB, free: 366.3 MB)
  67. 20/07/25 03:24:11 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 172.22.0.4, executor 0): java.io.FileNotFoundException: File file:/usr/local/airflow/sample.txt does not exist
  68. It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
  69. at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
  70. at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
  71. at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
  72. at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
  73. at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  74. at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
  75. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
  76. at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:153)
  77. at scala.collection.Iterator$class.foreach(Iterator.scala:891)
  78. at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:148)
  79. at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
  80. at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:557)
  81. at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:345)
  82. at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
  83. at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:194)
  84. 20/07/25 03:24:11 INFO TaskSetManager: Starting task 0.1 in stage 0.0 (TID 1, 172.22.0.4, executor 0, partition 0, PROCESS_LOCAL, 8307 bytes)
  85. 20/07/25 03:24:11 INFO TaskSetManager: Lost task 0.1 in stage 0.0 (TID 1) on 172.22.0.4, executor 0: java.io.FileNotFoundException (File file:/usr/local/airflow/sample.txt does not exist
  86. It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.) [duplicate 1]
  87. 20/07/25 03:24:11 INFO TaskSetManager: Starting task 0.2 in stage 0.0 (TID 2, 172.22.0.4, executor 0, partition 0, PROCESS_LOCAL, 8307 bytes)
  88. 20/07/25 03:24:12 INFO TaskSetManager: Lost task 0.2 in stage 0.0 (TID 2) on 172.22.0.4, executor 0: java.io.FileNotFoundException (File file:/usr/local/airflow/sample.txt does not exist
  89. It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.) [duplicate 2]
  90. 20/07/25 03:24:12 INFO TaskSetManager: Starting task 0.3 in stage 0.0 (TID 3, 172.22.0.4, executor 0, partition 0, PROCESS_LOCAL, 8307 bytes)
  91. 20/07/25 03:24:12 INFO TaskSetManager: Lost task 0.3 in stage 0.0 (TID 3) on 172.22.0.4, executor 0: java.io.FileNotFoundException (File file:/usr/local/airflow/sample.txt does not exist
  92. It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.) [duplicate 3]
  93. 20/07/25 03:24:12 ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
  94. 20/07/25 03:24:12 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
  95. 20/07/25 03:24:12 INFO TaskSchedulerImpl: Cancelling stage 0
  96. 20/07/25 03:24:12 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage cancelled
  97. 20/07/25 03:24:12 INFO DAGScheduler: ShuffleMapStage 0 (reduceByKey at /opt/spark-2.4.1/examples/src/main/python/wordcount.py:39) failed in 13.690 s due to Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 172.22.0.4, executor 0): java.io.FileNotFoundException: File file:/usr/local/airflow/sample.txt does not exist
  98. It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
  99. at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
  100. at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
  101. at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
  102. at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
  103. at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  104. at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
  105. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
  106. at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:153)
  107. at scala.collection.Iterator$class.foreach(Iterator.scala:891)
  108. at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:148)
  109. at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
  110. at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:557)
  111. at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:345)
  112. at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
  113. at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:194)
  114. Driver stacktrace:
  115. 20/07/25 03:24:12 INFO DAGScheduler: Job 0 failed: collect at /opt/spark-2.4.1/examples/src/main/python/wordcount.py:40, took 14.579961 s
  116. Traceback (most recent call last):
  117. File "/opt/spark-2.4.1/examples/src/main/python/wordcount.py", line 40, in <module>
  118. output = counts.collect()
  119. File "/opt/spark-2.4.1/python/lib/pyspark.zip/pyspark/rdd.py", line 816, in collect
  120. File "/opt/spark-2.4.1/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  121. File "/opt/spark-2.4.1/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  122. File "/opt/spark-2.4.1/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
  123. py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
  124. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 172.22.0.4, executor 0): java.io.FileNotFoundException: File file:/usr/local/airflow/sample.txt does not exist
  125. It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
  126. at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
  127. at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
  128. at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
  129. at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
  130. at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  131. at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
  132. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
  133. at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:153)
  134. at scala.collection.Iterator$class.foreach(Iterator.scala:891)
  135. at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:148)
  136. at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
  137. at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:557)
  138. at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:345)
  139. at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
  140. at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:194)
  141. Driver stacktrace:
  142. at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
  143. at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
  144. at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
  145. at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  146. at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  147. at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
  148. at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
  149. at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
  150. at scala.Option.foreach(Option.scala:257)
  151. at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
  152. at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
  153. at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
  154. at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
  155. at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
  156. at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
  157. at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
  158. at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
  159. at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
  160. at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
  161. at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
  162. at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  163. at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  164. at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
  165. at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
  166. at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
  167. at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
  168. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  169. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  170. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  171. at java.lang.reflect.Method.invoke(Method.java:498)
  172. at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
  173. at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
  174. at py4j.Gateway.invoke(Gateway.java:282)
  175. at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
  176. at py4j.commands.CallCommand.execute(CallCommand.java:79)
  177. at py4j.GatewayConnection.run(GatewayConnection.java:238)
  178. at java.lang.Thread.run(Thread.java:748)
  179. Caused by: java.io.FileNotFoundException: File file:/usr/local/airflow/sample.txt does not exist
  180. It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
  181. at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
  182. at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
  183. at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
  184. at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
  185. at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  186. at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
  187. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
  188. at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:153)
  189. at scala.collection.Iterator$class.foreach(Iterator.scala:891)
  190. at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:148)
  191. at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
  192. at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:557)
  193. at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:345)
  194. at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
  195. at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:194)
  196. 20/07/25 03:24:13 INFO SparkContext: Invoking stop() from shutdown hook
  197. 20/07/25 03:24:13 INFO SparkUI: Stopped Spark web UI at http://0508a77fcaad:4040
  198. 20/07/25 03:24:13 INFO StandaloneSchedulerBackend: Shutting down all executors
  199. 20/07/25 03:24:13 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Asking each executor to shut down
  200. 20/07/25 03:24:16 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
  201. 20/07/25 03:24:16 INFO MemoryStore: MemoryStore cleared
  202. 20/07/25 03:24:16 INFO BlockManager: BlockManager stopped
  203. 20/07/25 03:24:16 INFO BlockManagerMaster: BlockManagerMaster stopped
  204. 20/07/25 03:24:16 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
  205. 20/07/25 03:24:16 INFO SparkContext: Successfully stopped SparkContext
  206. 20/07/25 03:24:16 INFO ShutdownHookManager: Shutdown hook called
  207. 20/07/25 03:24:16 INFO ShutdownHookManager: Deleting directory /tmp/spark-2dfb2222-d56c-4ee1-ab62-86e71e5e751b
  208. 20/07/25 03:24:16 INFO ShutdownHookManager: Deleting directory /tmp/spark-f9dfe6ee-22d7-4747-beab-9450fc1afce0
  209. 20/07/25 03:24:16 INFO ShutdownHookManager: Deleting directory /tmp/spark-f9dfe6ee-22d7-4747-beab-9450fc1afce0/pyspark-2ee74d07-6606-4edc-8420-fe46212c50e5
ujv3wf0j

ujv3wf0j1#

改变你的想法 spark-submit 像下面提交你的Spark工作。

  1. spark-submit \
  2. --master spark://ipaddress:7077 \
  3. --deploy-mode cluster # add this if you want to pass file name to wordcount.py
  4. --files usr/local/airflow/sample.txt \
  5. /opt/spark-2.4.1/examples/src/main/python/wordcount.py sample.txt

  1. spark-submit \
  2. --master spark://ipaddress:7077 \
  3. /opt/spark-2.4.1/examples/src/main/python/wordcount.py /usr/local/airflow/sample.txt

相关问题