我正在尝试将s3中的数据移动到另一个位置,在该位置上,数据在静止时按日期字符串(源)进行分区,在静止时按year=yyyy/month=mm/day=dd进行分区/
虽然我能够在spark中读取整个源位置数据,并以tmp hdfs中的目标格式对其进行分区,但s3distcp无法将其从hdfs复制到s3。它失败,并出现outonmemory错误。
我正在尝试写近200万个小文件(每个20kb)
我的s3distcp正在运行以下参数 sudo -H -u hadoop nice -10 bash -c "if hdfs dfs -test -d hdfs:///<source_path>; then /usr/lib/hadoop/bin/hadoop jar /usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar -libjars /usr/share/aws/emr/s3-dist-cp/lib/ -Dmapreduce.job.reduces=30 -Dmapreduce.child.java.opts=Xmx2048m --src hdfs:///<source_path> --dest s3a://<destination_path> --s3ServerSideEncryption;fi"
它失败了
[2020-08-06 14:23:36,038] {bash_operator.py:126} INFO - # java.lang.OutOfMemoryError: Java heap space
[2020-08-06 14:23:36,038] {bash_operator.py:126} INFO - # -XX:OnOutOfMemoryError="kill -9 %p"```
The emr cluster I am running this is
"master_instance_type": "r5d.8xlarge",
"core_instance_type": "r5.2xlarge",
"core_instance_count": "8",
"task_instance_types": [ "r5.2xlarge","m5.4xlarge"],
"task_instance_count": "1000"
Any suggestions what I could increase configurations on s3Distcp for it to be able to copy this without running out of memory?
2条答案
按热度按时间wswtfjt71#
我以迭代的方式运行它,对于所说的aws堆栈,它能够在每次迭代中处理大约300k个文件,而不需要oom
baubqpgj2#
这是一个
classic
可以使用多线程的情况scheduling
设置Spark的能力spark.scheduler.mode=FAIR
分配pools
你需要做的是创建您的
list
分区预处理将此列表用作iterable
对于此列表中的每个迭代器,在不同的池中触发一个spark作业
无需使用不同的3DistCP
示例如下:
执行spark submit之前=>
完成后,我们运行此脚本并将结果保存在如下文件中:
bash build_year_month_day.sh > s3_<my_table_day_partition>_file.dat
现在我们可以在多线程中运行spark了spark代码需要两件事(除了
scheduler.mode=FAIR
1. creating an iterator from the file created above # s3_<my_table_day_partition>_file.dat2. sc.setLocalProperty
看看是怎么做到的。答。我们在spark应用程序python中读取了这个文件
b、 用100天的时间来激发100个线程:
有两件事需要注意
path_slices = islice(content_iter, strt, stp)
=>返回大小的片(strt - stp)
pool_index = int(s3path[0])
=>的索引content_iter
,我们将使用它来分配池id。现在是代码的核心
如您所见,我们希望将线程限制为100个池,因此
spark.scheduler.pool
作为pool_idex
%100在这个`process\in\u pool()函数中编写实际的转换/操作完成后,通过释放该池作为
你终于跑了
如果使用正确的executor/core/memory进行调优,您将看到巨大的性能提升。
同样的方法也可以在
scala
与concurrent.futures
但那是另一天。