如何在googledatproc中优化hadoopmapreduce压缩spark输出?

mkh04yzy  于 2021-05-31  发布在  Hadoop
关注(0)|答案(1)|浏览(401)

目标是:cassandra中的数百万行需要被提取并尽可能快速高效地压缩到一个文件中(每天)。
当前的安装程序使用googledataproc集群运行spark作业,将数据直接提取到google云存储桶中。我试过两种方法:
使用fileutil.copymerge()将大约9000个spark分区文件合并成一个未压缩的文件,然后提交一个hadoopmapreduce作业来压缩这个文件。
保留大约9000个spark分区文件作为原始输出,并提交hadoopmapreduce作业,将这些文件合并并压缩为单个文件。
一些作业细节:约8亿行。由spark作业输出的大约9000个spark分区文件。spark作业在一个1主4辅(4vcpu,每个15gb)dataproc集群上运行大约需要一个小时。默认的dataproc hadoop块大小,我认为是128mb。
一些spark配置细节:

  1. spark.task.maxFailures=10
  2. spark.executor.cores=4
  3. spark.cassandra.input.consistency.level=LOCAL_ONE
  4. spark.cassandra.input.reads_per_sec=100
  5. spark.cassandra.input.fetch.size_in_rows=1000
  6. spark.cassandra.input.split.size_in_mb=64

hadoop作业:

  1. hadoop jar file://usr/lib/hadoop-mapreduce/hadoop-streaming-2.8.4.jar
  2. -Dmapred.reduce.tasks=1
  3. -Dmapred.output.compress=true
  4. -Dmapred.compress.map.output=true
  5. -Dstream.map.output.field.separator=,
  6. -Dmapred.textoutputformat.separator=,
  7. -Dmapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec
  8. -input gs://bucket/with/either/single/uncompressed/csv/or/many/spark/partition/file/csvs
  9. -output gs://output/bucket
  10. -mapper /bin/cat
  11. -reducer /bin/cat
  12. -inputformat org.apache.hadoop.mapred.TextInputFormat
  13. -outputformat org.apache.hadoop.mapred.TextOutputFormat

spark的工作花了大约1个小时将cassandra数据提取到gcs bucket中。使用fileutil.copymerge()增加了大约45分钟,这是由dataproc集群执行的,但资源利用不足,因为它似乎使用1个节点。压缩单个文件的hadoop作业又花了50分钟。这并不是一个最佳的方法,因为集群必须保持更长的时间,即使它没有使用它的全部资源。
该作业的信息输出:

  1. INFO mapreduce.Job: Counters: 55
  2. File System Counters
  3. FILE: Number of bytes read=5072098452
  4. FILE: Number of bytes written=7896333915
  5. FILE: Number of read operations=0
  6. FILE: Number of large read operations=0
  7. FILE: Number of write operations=0
  8. GS: Number of bytes read=47132294405
  9. GS: Number of bytes written=2641672054
  10. GS: Number of read operations=0
  11. GS: Number of large read operations=0
  12. GS: Number of write operations=0
  13. HDFS: Number of bytes read=57024
  14. HDFS: Number of bytes written=0
  15. HDFS: Number of read operations=352
  16. HDFS: Number of large read operations=0
  17. HDFS: Number of write operations=0
  18. Job Counters
  19. Killed map tasks=1
  20. Launched map tasks=353
  21. Launched reduce tasks=1
  22. Rack-local map tasks=353
  23. Total time spent by all maps in occupied slots (ms)=18495825
  24. Total time spent by all reduces in occupied slots (ms)=7412208
  25. Total time spent by all map tasks (ms)=6165275
  26. Total time spent by all reduce tasks (ms)=2470736
  27. Total vcore-milliseconds taken by all map tasks=6165275
  28. Total vcore-milliseconds taken by all reduce tasks=2470736
  29. Total megabyte-milliseconds taken by all map tasks=18939724800
  30. Total megabyte-milliseconds taken by all reduce tasks=7590100992
  31. Map-Reduce Framework
  32. Map input records=775533855
  33. Map output records=775533855
  34. Map output bytes=47130856709
  35. Map output materialized bytes=2765069653
  36. Input split bytes=57024
  37. Combine input records=0
  38. Combine output records=0
  39. Reduce input groups=2539721
  40. Reduce shuffle bytes=2765069653
  41. Reduce input records=775533855
  42. Reduce output records=775533855
  43. Spilled Records=2204752220
  44. Shuffled Maps =352
  45. Failed Shuffles=0
  46. Merged Map outputs=352
  47. GC time elapsed (ms)=87201
  48. CPU time spent (ms)=7599340
  49. Physical memory (bytes) snapshot=204676702208
  50. Virtual memory (bytes) snapshot=1552881852416
  51. Total committed heap usage (bytes)=193017675776
  52. Shuffle Errors
  53. BAD_ID=0
  54. CONNECTION=0
  55. IO_ERROR=0
  56. WRONG_LENGTH=0
  57. WRONG_MAP=0
  58. WRONG_REDUCE=0
  59. File Input Format Counters
  60. Bytes Read=47132294405
  61. File Output Format Counters
  62. Bytes Written=2641672054

我期望这种方法的性能与另一种方法一样好或更好,但它的性能要差得多。spark的工作没有改变。跳过fileutil.copymerge()并直接跳转到hadoop mapreduce作业。。。一个半小时后,这份工作的Map部分只占50%左右。这项工作在那个时候被取消了,因为很明显这是行不通的。
我完全可以控制spark和hadoop的工作。我知道我们可以创建一个更大的集群,但我宁愿在确保作业本身得到优化之后再这样做。感谢您的帮助。谢谢。

oknwwptz

oknwwptz1#

你能提供你的spark工作的更多细节吗?你在使用什么样的spark api-rdd还是dataframe?为什么不在spark中完全执行合并阶段(使用repartition().write()),并避免spark和mr jobs之间的链接?

相关问题