hadoop—如何使用spark、s3distcp和aws emr将大型数据集从一个s3位置读取并重新分区到另一个s3位置

jvlzgdj9  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(600)

我正在尝试将s3中的数据移动到另一个位置,在该位置上,数据在静止时按日期字符串(源)进行分区,在静止时按year=yyyy/month=mm/day=dd进行分区/
虽然我能够在spark中读取整个源位置数据,并以tmp hdfs中的目标格式对其进行分区,但s3distcp无法将其从hdfs复制到s3。它失败,并出现outonmemory错误。
我正在尝试写近200万个小文件(每个20kb)
我的s3distcp正在运行以下参数 sudo -H -u hadoop nice -10 bash -c "if hdfs dfs -test -d hdfs:///<source_path>; then /usr/lib/hadoop/bin/hadoop jar /usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar -libjars /usr/share/aws/emr/s3-dist-cp/lib/ -Dmapreduce.job.reduces=30 -Dmapreduce.child.java.opts=Xmx2048m --src hdfs:///<source_path> --dest s3a://<destination_path> --s3ServerSideEncryption;fi" 它失败了

[2020-08-06 14:23:36,038] {bash_operator.py:126} INFO - # java.lang.OutOfMemoryError: Java heap space
[2020-08-06 14:23:36,038] {bash_operator.py:126} INFO - # -XX:OnOutOfMemoryError="kill -9 %p"```

The emr cluster I am running this is 
"master_instance_type": "r5d.8xlarge",
"core_instance_type": "r5.2xlarge",
"core_instance_count": "8",
"task_instance_types": [ "r5.2xlarge","m5.4xlarge"],
"task_instance_count": "1000"

Any suggestions what I could increase configurations on s3Distcp for it to be able to copy this without running out of memory?
wswtfjt7

wswtfjt71#

我以迭代的方式运行它,对于所说的aws堆栈,它能够在每次迭代中处理大约300k个文件,而不需要oom

baubqpgj

baubqpgj2#

这是一个 classic 可以使用多线程的情况 scheduling 设置Spark的能力 spark.scheduler.mode=FAIR 分配 pools 你需要做的是
创建您的 list 分区预处理
将此列表用作iterable
对于此列表中的每个迭代器,在不同的池中触发一个spark作业
无需使用不同的3DistCP
示例如下:
执行spark submit之前=>


# Create a List of all *possible* partitions like this

# Example S3 prefixes :

# s3://my_bucket/my_table/year=2019/month=02/day=20

# ...

# ...

# s3://my_bucket/my_table/year=2020/month=03/day=15

# ...

# ...

# s3://my_bucket/my_table/year=2020/month=09/day=01

# WE SET `TARGET_PREFIX` as:

TARGET_PREFIX="s3://my_bucket/my_table"

# And Create a List ( till Day=nn part)

# By looping twice

# Increase loop numbers if partition is till hour

aws s3 ls "${TARGET_PREFIX}/"|grep PRE|awk '{print $2}'|while read year_part ;

do

full_year_part="${TARGET_PREFIX}/${year_part}";

aws s3 ls ${full_year_part}|grep PRE|awk '{print $2}'|while read month_part;

do

full_month_part=${full_year_part}${month_part};

aws s3 ls ${full_month_part}|grep PRE|awk -v pref=$full_month_part '{print pref$2}';

done;

done

完成后,我们运行此脚本并将结果保存在如下文件中: bash build_year_month_day.sh > s3_<my_table_day_partition>_file.dat 现在我们可以在多线程中运行spark了
spark代码需要两件事(除了 scheduler.mode=FAIR 1. creating an iterator from the file created above # s3_<my_table_day_partition>_file.dat 2. sc.setLocalProperty 看看是怎么做到的。
答。我们在spark应用程序python中读取了这个文件

year_month_date_index_file = "s3_<my_table_day_partition>_file.dat"
 with open(year_month_date_index_file, 'r') as f:
        content = f.read()
 content_iter = [(idx, c) for idx, c in enumerate(content.split("\n")) if c]

b、 用100天的时间来激发100个线程:


# Number of THREADS can be Increased or Decreased

    strt = 0
    stp = 99
    while strt < len(content_iter):

        threads_lst = []
        path_slices = islice(content_iter, strt, stp)
        for s3path in path_slices:
            print("PROCESSING FOR PATH {}".format(s3path))
            pool_index = int(s3path[0]) # Spark needs a POOL ID
            my_addr = s3path[1]
            # CALLING `process_in_pool` in each thread
            agg_by_day_thread = threading.Thread(target=process_in_pool, args=(pool_index, <additional_args>)) # Pool_index is mandatory argument.
            agg_by_day_thread.start() # Start opf Thread
            threads_lst.append(agg_by_day_thread)

        for process in threads_lst:
            process.join() # Wait for All Threads To Finish

        strt = stp
        stp += 100

有两件事需要注意 path_slices = islice(content_iter, strt, stp) =>返回大小的片
(strt - stp) pool_index = int(s3path[0]) =>的索引 content_iter ,我们将使用它来分配池id。
现在是代码的核心

def process_in_pool(pool_id, <other_arguments>):
    sc.setLocalProperty("spark.scheduler.pool", "pool_id_{}".format(str(int(pool_id) % 100)))

如您所见,我们希望将线程限制为100个池,因此 spark.scheduler.pool 作为 pool_idex %100在这个`process\in\u pool()函数中编写实际的转换/操作
完成后,通过释放该池作为

...
sc.setLocalProperty("spark.scheduler.pool", None)
return

你终于跑了

spark-submit \
-- Other options \
--conf spark.scheduler.mode=FAIR \
--other options \
my_spark_app.py

如果使用正确的executor/core/memory进行调优,您将看到巨大的性能提升。
同样的方法也可以在 scalaconcurrent.futures 但那是另一天。

相关问题