我尝试使用spark同时从s3和snowflake读取数据,并在处理(join操作)后将其放入snowflake中。
在测试过程中,我发现每种方法的结果相同,但性能不同(第二次尝试的是日志。)
v1:分别从s3和snowflake读取,进行“join”操作,保存snowflake
1st try : It took 2 hours, 46 minutes.
2nd try : It took 1 hour, 12 minutes.
v2:将从雪花中读取的数据b写入s3,重新读取,并在“join”操作后保存雪花
1st try : It took 5 minutes.
2nd try : It took 5 minutes.
v3:分别从s3和snowflake中读取,将结果写入s3并重新读取,保存snowflake
1st try : It took 1h 11mins.
2nd try : It took 50 minutes.
问题。为什么每个测试的表现都不同?我认为v1应该是最快的雪花Spark连接器支持。
下面是每行的测试细节和时间信息,公共配置
数据a—存储在s3中: 20.2千兆字节
数据b-存储在snowflake(内部表)中:7kb(195行)
Spark 配置:r5.4XL(4)按需
雪花仓库:3x大
aws emr版本:emr-5.31.0
spark版本:2.4.6
雪花版:spark-snowflake\ 2.11
测试(使用aws emr)
v1:分别从s3和snowflake读取,进行“join”操作,保存snowflake
结果:2小时后发生oom((退出状态137。)
change spark config by aws guide“.set”(“spark.sql.shuffle.partitions”,“500”)和resultdf.repartition(500).cache():花了2小时46分钟。
object snowflake_v1 {
val spark_config = new SparkConf()
.set("fs.s3.maxConnections", "5000")
.set("spark.sql.broadcastTimeout", "1200")
.set("spark.sql.shuffle.partitions", "500")
.set("spark.network.timeout", "10000000")
.set("spark.executor.heartbeatInterval", "1000000")
val spark = SparkSession
.builder()
.master("yarn")
.config(spark_config)
.appName("snowflake")
.getOrCreate()
def main(args: Array[String]): Unit = {
//parquet at snowflake_v1.scala:27
val Adf =
spark.read
.parquet(
"s3://"
)
.cache()
var sfOptions = Map.apply(
"sfURL" -> "XXX",
"sfUser" -> "XXX",
"sfPassword" -> "XXX",
"sfDatabase" -> "XXX",
"sfSchema" -> "XXX",
"sfWarehouse" -> "XXX"
)
val Bdf: DataFrame = spark.sqlContext.read
.format(SNOWFLAKE_SOURCE_NAME)
.options(sfOptions)
.option("dbtable", "t_B")
.load()
val resDF =
Adf.join(Bdf, Seq("cnty"), "leftouter").cache()
val newDF = resultDF.repartition(500).cache() //If not, OOM occurs
newDF .write
.format(SNOWFLAKE_SOURCE_NAME)
.options(sfOptions)
.option("dbtable", "t_result_spark_v1")
.option("parallelism", "8")
.mode(SaveMode.Overwrite)
.save()
}
}
v2:将从雪花中读取的数据b写入s3,重新读取,并在“join”操作后保存雪花
结果:用了5分钟。
object snowflake_v2 {
val spark_config = "same as v1"
val spark = "same as v1"
def main(args: Array[String]): Unit = {
// parquet at snowflake_v2.scala:25
val Adf = "same as v1"
var sfOptions = "same as v1"
val Bdf: DataFrame = "same as v1"
//parquet at snowflake_v2.scala:53
Bdf.write
.mode(SaveMode.Overwrite)
.parquet("s3://..b")
//parquet at snowflake_v2.scala:56
val Bdf2=
spark.read.parquet("s3://..b")
val resDF =
Adf.join(Bdf2, Seq("cnty"), "leftouter").cache()
resDF .write
.format(SNOWFLAKE_SOURCE_NAME)
.options(sfOptions)
.option("dbtable", "t_result_spark_v2")
.option("parallelism", "8")
.mode(SaveMode.Overwrite)
.save()
}
}
v3:分别从s3和snowflake中读取,将结果写入s3并重新读取,保存snowflake
结果:30分钟后,s3和snowflake出现重复行
在将结果写入s3之前,如果我打印计数,则需要1小时11分钟,以便实际操作继续进行。(无重复)
object snowflake_v3 {
val spark_config = "same as v1"
val spark = "same as v1"
def main(args: Array[String]): Unit = {
//parquet at snowflake_v3.scala:25
val Adf = "same as v1"
var sfOptions = "same as v1"
val Bdf: DataFrame = "same as v1"
val resDF =
Adf.join(Bdf, Seq("cnty"), "leftouter")
println("resDF count")
//count at snowflake_v3.scala:54
println(resDF.count) //If not, duplicated rows occur
//parquet at snowflake_v3.scala:58
resDF.write
.mode(SaveMode.Overwrite)
.parquet("s3://../temp_result")
//parquet at snowflake_v3.scala:65
val resDF2 =
spark.read.parquet("s3://../temp_result")
resDF2 .write
.format(SNOWFLAKE_SOURCE_NAME)
.options(sfOptions)
.option("dbtable", "t_result_spark_v3")
.option("parallelism", "8")
.mode(SaveMode.Overwrite)
.save()
}
}
暂无答案!
目前还没有任何答案,快来回答吧!