目标是:cassandra中的数百万行需要被提取并尽可能快速高效地压缩到一个文件中(每天)。
当前的安装程序使用googledataproc集群运行spark作业,将数据直接提取到google云存储桶中。我试过两种方法:
使用fileutil.copymerge()将大约9000个spark分区文件合并成一个未压缩的文件,然后提交一个hadoopmapreduce作业来压缩这个文件。
保留大约9000个spark分区文件作为原始输出,并提交hadoopmapreduce作业,将这些文件合并并压缩为单个文件。
一些作业细节:约8亿行。由spark作业输出的大约9000个spark分区文件。spark作业在一个1主4辅(4vcpu,每个15gb)dataproc集群上运行大约需要一个小时。默认的dataproc hadoop块大小,我认为是128mb。
一些spark配置细节:
spark.task.maxFailures=10
spark.executor.cores=4
spark.cassandra.input.consistency.level=LOCAL_ONE
spark.cassandra.input.reads_per_sec=100
spark.cassandra.input.fetch.size_in_rows=1000
spark.cassandra.input.split.size_in_mb=64
hadoop作业:
hadoop jar file://usr/lib/hadoop-mapreduce/hadoop-streaming-2.8.4.jar
-Dmapred.reduce.tasks=1
-Dmapred.output.compress=true
-Dmapred.compress.map.output=true
-Dstream.map.output.field.separator=,
-Dmapred.textoutputformat.separator=,
-Dmapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec
-input gs://bucket/with/either/single/uncompressed/csv/or/many/spark/partition/file/csvs
-output gs://output/bucket
-mapper /bin/cat
-reducer /bin/cat
-inputformat org.apache.hadoop.mapred.TextInputFormat
-outputformat org.apache.hadoop.mapred.TextOutputFormat
spark的工作花了大约1个小时将cassandra数据提取到gcs bucket中。使用fileutil.copymerge()增加了大约45分钟,这是由dataproc集群执行的,但资源利用不足,因为它似乎使用1个节点。压缩单个文件的hadoop作业又花了50分钟。这并不是一个最佳的方法,因为集群必须保持更长的时间,即使它没有使用它的全部资源。
该作业的信息输出:
INFO mapreduce.Job: Counters: 55
File System Counters
FILE: Number of bytes read=5072098452
FILE: Number of bytes written=7896333915
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
GS: Number of bytes read=47132294405
GS: Number of bytes written=2641672054
GS: Number of read operations=0
GS: Number of large read operations=0
GS: Number of write operations=0
HDFS: Number of bytes read=57024
HDFS: Number of bytes written=0
HDFS: Number of read operations=352
HDFS: Number of large read operations=0
HDFS: Number of write operations=0
Job Counters
Killed map tasks=1
Launched map tasks=353
Launched reduce tasks=1
Rack-local map tasks=353
Total time spent by all maps in occupied slots (ms)=18495825
Total time spent by all reduces in occupied slots (ms)=7412208
Total time spent by all map tasks (ms)=6165275
Total time spent by all reduce tasks (ms)=2470736
Total vcore-milliseconds taken by all map tasks=6165275
Total vcore-milliseconds taken by all reduce tasks=2470736
Total megabyte-milliseconds taken by all map tasks=18939724800
Total megabyte-milliseconds taken by all reduce tasks=7590100992
Map-Reduce Framework
Map input records=775533855
Map output records=775533855
Map output bytes=47130856709
Map output materialized bytes=2765069653
Input split bytes=57024
Combine input records=0
Combine output records=0
Reduce input groups=2539721
Reduce shuffle bytes=2765069653
Reduce input records=775533855
Reduce output records=775533855
Spilled Records=2204752220
Shuffled Maps =352
Failed Shuffles=0
Merged Map outputs=352
GC time elapsed (ms)=87201
CPU time spent (ms)=7599340
Physical memory (bytes) snapshot=204676702208
Virtual memory (bytes) snapshot=1552881852416
Total committed heap usage (bytes)=193017675776
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=47132294405
File Output Format Counters
Bytes Written=2641672054
我期望这种方法的性能与另一种方法一样好或更好,但它的性能要差得多。spark的工作没有改变。跳过fileutil.copymerge()并直接跳转到hadoop mapreduce作业。。。一个半小时后,这份工作的Map部分只占50%左右。这项工作在那个时候被取消了,因为很明显这是行不通的。
我完全可以控制spark和hadoop的工作。我知道我们可以创建一个更大的集群,但我宁愿在确保作业本身得到优化之后再这样做。感谢您的帮助。谢谢。
1条答案
按热度按时间oknwwptz1#
你能提供你的spark工作的更多细节吗?你在使用什么样的spark api-rdd还是dataframe?为什么不在spark中完全执行合并阶段(使用repartition().write()),并避免spark和mr jobs之间的链接?