嗨,我是spark和amazon电子病历集群的新手。
我试图编写一个可以在amazonemr集群上运行的演示spark应用程序。当代码在zeppelin笔记本上运行时,它会返回输出,我认为输出将保存为amazon emr集群上的单个文件,如下所示:
%pyspark
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
if type(df.c) == type(upper(df.c)) == type(df.c.isNull()):
df_new = df.withColumn('upper_c', upper(df.c))
df_new
+---+---+-------+----------+-------------------+-------+
| a| b| c| d| e|upper_c|
+---+---+-------+----------+-------------------+-------+
| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|STRING1|
| 2|3.0|string2|2000-02-01|2000-01-02 12:00:00|STRING2|
| 3|4.0|string3|2000-03-01|2000-01-03 12:00:00|STRING3|
+---+---+-------+----------+-------------------+-------+
spark应用程序:
from pyspark.sql import SparkSession
from pyspark.sql import Column
from pyspark.sql.functions import upper
from datetime import datetime, date
import argparse
def pre_processing(output_uri):
spark = SparkSession.builder.appName("process sample data").getOrCreate()
rdd = spark.sparkContext.parallelize([
(1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
(2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
(3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
])
df = spark.createDataFrame(rdd, schema=['a', 'b', 'c', 'd', 'e'])
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
if type(df.c) == type(upper(df.c)) == type(df.c.isNull()):
df_new = df.withColumn('upper_c', upper(df.c))
df_new
df_new.write.option("header", "true").mode("overwrite").csv(output_uri)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--output_uri', help="The URI where output is saved")
args = parser.parse_args()
pre_processing(args.output_uri)
不过,当我在集群上运行spark应用程序时,它会将多个csv文件保存到s3 bucket中。我想知道为什么我的spark应用程序会将Dataframe保存到s3存储桶上的多个文件中。
spark应用程序的参数如下:
spark-submit --deploy-mode cluster s3://<BUCKET_NAME>/spark_application/emr_demo_app.py --output_uri s3://<BUCKET_NAME>/output
提前谢谢。
ps:有一次,我按照aws emr教程的下一页,和样本应用程序保存为一个单一的csv文件。
1条答案
按热度按时间8tntrjer1#
spark使用分区的概念,以便在工作者之间并行化任务。Dataframe也被分区,当调用save操作时,每个worker将保存Dataframe的一部分,从而创建多个文件。
为了创建单个文件,只需
repartition
或者coalesce
将Dataframe划分为一个分区:所有的数据都被发送到一个worker,然后worker将记录保存到一个文件中。如果数据集太大,就会出现瓶颈问题。这里有一个类似的答案:使用sparkcsv编写单个csv文件