我有一个PySpark脚本,其中数据被处理,然后转换为CSV文件。由于最终结果应该是一个CSV文件,可通过WinSCP访问,我做了一些额外的处理,将工作节点上的CSV文件放在一起,并将其从HDFS传输到FTP服务器(我认为这是所谓的边缘节点)。
from py4j.java_gateway import java_import
import os
YYMM = date[2:7].replace('-','')
# First, clean out both HDFS and local folder so CSVs do not stack up (data history is stored in DB anyway if update option is enabled)
os.system('hdfs dfs -rm -f -r /hdfs/path/new/*')
os.system('rm -f /ftp/path/new/*')
#timestamp = str(datetime.now()).replace(' ','_').replace(':','-')[0:19]
df.coalesce(1).write.csv('/hdfs/path/new/dataset_temp_' + date, header = "true", sep = "|")
# By default, output CSV has weird name ("part-0000-..."). To give proper name and delete automatically created upper folder, do some more processing
java_import(spark._jvm, 'org.apache.hadoop.fs.Path')
sc = spark.sparkContext
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
file = fs.globStatus(sc._jvm.Path('/hdfs/path/new/dataset_temp_' + date + '/part*'))[0].getPath().getName()
fs.rename(sc._jvm.Path('/hdfs/path/new/dataset_temp_' + date + "/" + file), sc._jvm.Path('/hdfs/path/new/dataset_' + YYMM + '.csv'))
fs.delete(sc._jvm.Path('/hdfs/path/new/dataset_temp_' + date), True)
# Shift CSV file out of HDFS into "regular" SFTP server environment
os.system('hdfs dfs -copyToLocal hdfs://<server>/hdfs/path/new/dataset_' + YYMM + '.csv' + ' /ftp/path/new')
在客户端模式下一切正常。但是当我切换到集群模式时,它给出了一个错误消息,指出CopyToLocal
-命令中的最终/ftp/path/new没有找到,我想是因为它在工作节点上查找,而不是在边缘节点上查找。有什么方法可以克服这个问题吗?作为一种替代方法,我想从Spark会话之外的批处理脚本中执行最终的CopyToLocal命令。但我更希望能在一个剧本里完成。
1条答案
按热度按时间bweufnob1#
您可以直接将输出写入ftp位置,而不是在您的spark脚本中运行OS命令。您需要提供指向ftp位置的路径,并将savemode设置为覆盖。然后,您可以在spark脚本完成后运行代码来重命名数据。
执行上述代码后,在单独的步骤中运行下面的命令。
我假设ftp位置可以被worker节点访问,因为您可以在客户端模式下运行copyToLOcal命令。