pyspark 为什么Spark尝试在独立模式下在worker节点上加载CSV?

vxf3dgd4  于 2023-10-15  发布在  Spark
关注(0)|答案(1)|浏览(103)

我目前在独立模式下使用Apache Spark,我已经配置了一个工作节点。我有一个简单的代码片段,我想执行,我已经提供了下面。

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
sc = SparkSession.builder.appName("YourAppName").getOrCreate()

csvFile = 'file:///home/bennison/Documents/yavar-learnings/spark/pyspark/departuredelays.csv'
file_path_on_master = csvFile
# spark.addFile(csvFile)

data_rdd = sc.read.format('csv').option(
    'inferSchema', 'true').option('header', 'true').load(csvFile)
    
    
data_rdd.show()
data_rdd.createOrReplaceTempView('us_delay_flights_tbl')

# find all flights whose distance is greater than 1,000 miles:
sc.sql('SELECT * FROM us_delay_flights_tbl WHERE distance > 1000 ORDER BY distance DESC').show()

# find all flights between San Francisco (SFO) and Chicago (ORD) with at least a two-hour delay
sc.sql('SELECT date, distance, origin, destination FROM us_delay_flights_tbl WHERE origin = "SFO" AND destination = "ORD" AND delay >= 120 ORDER BY delay DESC').show()

# label all US flights, regardless of origin and destination,with an indication of the delays they experienced: Very Long Delays (> 6 hours), Long Delays (2–6 hours), etc. We’ll add these human-readable labels in a new column called Flight_Delays
sc.sql(
    'SELECT date, distance, origin, destination, CASE WHEN delay = 0 THEN "No Dely" WHEN delay > 0 AND delay < 2 THEN "Tolerable Delays" WHEN delay > 2 AND delay < 6 THEN "Long Delays" WHEN delay > 6 THEN "Very Long Delays" ELSE "Early" END AS delay_def FROM us_delay_flights_tbl ORDER BY delay DESC').show()

(data_rdd.select("distance", "origin", "destination").where(
    'distance > 1000').orderBy(desc('distance'))).show()

(data_rdd.select('date', 'distance', 'origin', 'destination').filter((col('origin') == "SFO")
 & (col("destination") == "ORD") & (col("delay") >= 120)).orderBy(desc('delay')).show())

(data_rdd.select('date', 'distance', 'origin', 'destination', when(col('delay') == 0, 'No Delay').when((col('delay') > 0) & (col('delay') < 2),
 "Tolerable Delays").when((col('delay') > 2) & (col('delay') < 6), "Long Delays").when((col('delay') > 6), "Very Long Delay").otherwise('Early').alias('delay_def')).orderBy(desc(col='delay')).show())

sc.stop()

为了提交作业,我先启动Spark master节点,然后启动worker节点。然而,我在尝试使用spark-submit命令执行代码时遇到了一个问题。看起来Spark试图读取worker节点上的CSV文件,即使该文件位于master节点上。
我对Spark行为的理解是,主程序应该从自己的文件路径读取CSV文件,然后根据需要将数据分发到集群中。鉴于这种理解,我很困惑,为什么我会遇到错误消息:“不存在”
我在下面提供了准确的错误。

Caused by: java.io.FileNotFoundException: 
File file:/home/bennison/Documents/yavar-learnings/spark/pyspark/departuredelays.csv does not exist

It is possible the underlying files have been updated. You can explicitly invalidate
the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by
recreating the Dataset/DataFrame involved.

我验证了文件在主节点中的位置,它确实存在于指定的文件路径中。
有人能解释一下为什么Spark试图在工作节点而不是主节点上加载CSV文件,以及我如何解决这个问题?

mgdq6dx1

mgdq6dx11#

参考请参见https://stackoverflow.com/a/24736077/8726538
来自外部数据集的编程指南:如果使用本地文件系统上的路径,则该文件还必须在工作节点上的相同路径上可访问。将文件复制到所有工作程序,或使用网络装载的共享文件系统。

相关问题