我正在运行一个spark作业,它的任务是扫描一个大文件并将其拆分为更小的文件。这个文件是json行格式的,我试图用一个特定的列(id)对它进行分区,并将每个分区作为一个单独的文件保存到s3中。文件大小约为12GB,但id的不同值约为500000个。查询大约需要15个小时。我能做些什么来提高性能?对于这样的任务,spark是一个糟糕的选择吗?请注意,我有权确保源代码为每个id的固定行数。
import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from awsglue.utils import getResolvedOptions
from awsglue.transforms import *
from pyspark.sql.functions import udf, substring, instr, locate
from datetime import datetime, timedelta
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
# Get parameters that were passed to the job
args = getResolvedOptions(sys.argv, ['INPUT_FOLDER', 'OUTPUT_FOLDER', 'ID_TYPE', 'DATASET_DATE'])
id_type = args["ID_TYPE"]
output_folder = "{}/{}/{}".format(args["OUTPUT_FOLDER"], id_type, args["DATASET_DATE"])
input_folder = "{}/{}/{}".format(args["INPUT_FOLDER"], id_type, args["DATASET_DATE"])
INS_SCHEMA = StructType([
StructField("camera_capture_timestamp", StringType(), True),
StructField(id_type, StringType(), True),
StructField("image_uri", StringType(), True)
])
data = spark.read.format("json").load(input_folder, schema=INS_SCHEMA)
data = data.withColumn("fnsku_1", F.col("fnsku"))
data.coalesce(1).write.partitionBy(["fnsku_1"]).mode('append').json(output_folder)
我也尝试过重新划分而不是合并。
我在用aws胶水
2条答案
按热度按时间qhhrdooz1#
如果你不打算使用spark来做任何事情,而只是将文件分割成更小的版本,那么我会说spark是一个糟糕的选择。您最好在aws中按照下面的堆栈溢出文章中给出的方法来做这件事
假设您有一个可用的ec2示例,您可以运行如下操作:
如果您希望在spark中对数据进行进一步的处理,那么您需要将数据重新划分为128mb到1gb之间的块。使用默认(snappy)压缩,通常会得到原始文件大小的20%。所以,在您的例子中:在(12/5)~3和(12/5/8)~20个分区之间,所以:
对于spark来说,这实际上不是一个特别大的数据集,处理起来也不应该那么麻烦。
保存为Parquet给你一个很好的恢复点,重新读取数据将非常快。总文件大小约为2.5 gb。
aiazj4mn2#
请考虑以下选项之一。如果能帮上忙那就太棒了:)
首先,如果合并,如注解中所说的@lamanus,这意味着您将减少分区的数量,因此也将减少writer任务的数量,从而将所有数据洗牌为1个任务。这可能是改善的第一个因素。
为了克服这个问题,即每个分区写一个文件并保持并行化级别,您可以更改以下逻辑:
它是如何工作的?
首先,代码执行无序排列,将与特定键(与分区相同)相关的所有行收集到相同的分区。因此,它将一次对属于该键的所有行执行写操作。前段时间我写了一篇关于
partitionBy
方法。大致来说,它会在内部对给定分区上的记录进行排序,然后将它们逐个写入文件中。这样我们就得到了这样一个计划,其中只有1次洗牌,因此存在处理消耗操作:
输出
TestSoAnswer
执行两次看起来是这样的:您还可以使用此配置控制每个文件写入的记录数。
编辑:没有看到@mazaneicha的评论,但实际上,你可以试试
repartition("partitioning column")
! 这比分组表达式更清楚。最好的,
巴托斯。