我尝试使用spark而不是sqoop将oracle表迁移到hdfs。输出表大小约为122(gb,snappy,parquet),记录数约为1140000000条。
在同一集群(5个节点)上,使用50mapper的sqoop迁移(parquet/snappy)需要一个小时,但是使用以下配置的spark需要2.5小时。
spark.sparkContext.hadoopConfiguration.set("parquet.block.size", (1024 * 1024 * 256).toString)
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
spark.conf.set("spark.task.cpus", "5")
spark.conf.set("spark.io.compression.codec", "org.apache.spark.io.SnappyCompressionCodec")
val dbtable = "(SELECT * FROM pd_prd) tmp"
val df = spark.read
.format("jdbc")
.option("url", url)
.option("driver", driver)
.option("dbtable", dbtable)
.option("user", user)
.option("password", password)
.option("numPartitions", 50)
.option("partitionColumn", "prd_no")
.option("lowerBound", 0)
.option("upperBound", 2796542250L)
.option("fetchSize", 400000)
.load()
df.write.mode("overwrite").saveAsTable("tmp.test")
有50个执行器,每个执行器6个内核,驱动内存4g,执行内存16g。
这是一个Map唯一的工作,那么我认为应该没有激烈的性能差异之间的两个。但是你能猜到为什么Spark在这里这么慢吗?
我可以考虑其他优化选项来最小化spark执行时间吗?
在这里,spark的输出与sqoop的输出不同。如我所见,sqoop在内部通过计算total和total/mapper数来处理这种偏差,使其均匀分布。你认为我怎样才能在spark中实现类似的功能(另外,spark只是将partitioncolumn存储为2796542250/50:=55000000,并抛出类似于“select*from pd\u prd where prd\u no<55000000”的语句,这会带来偏移,因为旧产品(较低的prd\u no)已经从表中删除了。)
暂无答案!
目前还没有任何答案,快来回答吧!