HDFS Spark:在群集模式下复制到本地

ifsvaxew  于 2022-12-09  发布在  HDFS
关注(0)|答案(1)|浏览(213)

我有一个PySpark脚本,其中数据被处理,然后转换为CSV文件。由于最终结果应该是一个CSV文件,可通过WinSCP访问,我做了一些额外的处理,将工作节点上的CSV文件放在一起,并将其从HDFS传输到FTP服务器(我认为这是所谓的边缘节点)。

from py4j.java_gateway import java_import
import os
        
YYMM = date[2:7].replace('-','')
        
# First, clean out both HDFS and local folder so CSVs do not stack up (data history is stored in DB anyway if update option is enabled)
os.system('hdfs dfs -rm -f -r /hdfs/path/new/*')
os.system('rm -f /ftp/path/new/*')
        
#timestamp = str(datetime.now()).replace(' ','_').replace(':','-')[0:19]   
df.coalesce(1).write.csv('/hdfs/path/new/dataset_temp_' + date, header = "true", sep = "|")
        
# By default, output CSV has weird name ("part-0000-..."). To give proper name and delete automatically created upper folder, do some more processing
java_import(spark._jvm, 'org.apache.hadoop.fs.Path')
        
sc = spark.sparkContext
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
file = fs.globStatus(sc._jvm.Path('/hdfs/path/new/dataset_temp_' + date + '/part*'))[0].getPath().getName()
fs.rename(sc._jvm.Path('/hdfs/path/new/dataset_temp_' + date + "/" + file), sc._jvm.Path('/hdfs/path/new/dataset_' + YYMM + '.csv'))
fs.delete(sc._jvm.Path('/hdfs/path/new/dataset_temp_' + date), True) 
        
# Shift CSV file out of HDFS into "regular" SFTP server environment
os.system('hdfs dfs -copyToLocal hdfs://<server>/hdfs/path/new/dataset_' + YYMM + '.csv' + ' /ftp/path/new')

在客户端模式下一切正常。但是当我切换到集群模式时,它给出了一个错误消息,指出CopyToLocal-命令中的最终/ftp/path/new没有找到,我想是因为它在工作节点上查找,而不是在边缘节点上查找。有什么方法可以克服这个问题吗?作为一种替代方法,我想从Spark会话之外的批处理脚本中执行最终的CopyToLocal命令。但我更希望能在一个剧本里完成。

bweufnob

bweufnob1#

您可以直接将输出写入ftp位置,而不是在您的spark脚本中运行OS命令。您需要提供指向ftp位置的路径,并将savemode设置为覆盖。然后,您可以在spark脚本完成后运行代码来重命名数据。

YYMM = date[2:7].replace('-','')

df.coalesce(1).write
  .mode("overwrite")
  .csv('/ftp/path/new/{0}'.format(date), header = "true", sep = "|")

执行上述代码后,在单独的步骤中运行下面的命令。

os.system("mv /ftp/path/new/{0}/*.csv /ftp/path/new/{0}/dataset_' + YYMM + '.csv".format(date))

我假设ftp位置可以被worker节点访问,因为您可以在客户端模式下运行copyToLOcal命令。

相关问题