使用spark同时读取snowflake和s3

njthzxwz  于 2021-05-18  发布在  Spark
关注(0)|答案(0)|浏览(556)

我尝试使用spark同时从s3和snowflake读取数据,并在处理(join操作)后将其放入snowflake中。
在测试过程中,我发现每种方法的结果相同,但性能不同(第二次尝试的是日志。)
v1:分别从s3和snowflake读取,进行“join”操作,保存snowflake

1st try : It took 2 hours, 46 minutes.
     2nd try : It took 1 hour, 12 minutes.

v2:将从雪花中读取的数据b写入s3,重新读取,并在“join”操作后保存雪花

1st try : It took 5 minutes.
      2nd try : It took 5 minutes.

v3:分别从s3和snowflake中读取,将结果写入s3并重新读取,保存snowflake

1st try : It took 1h 11mins.
    2nd try : It took 50 minutes.

问题。为什么每个测试的表现都不同?我认为v1应该是最快的雪花Spark连接器支持。
下面是每行的测试细节和时间信息,公共配置
数据a—存储在s3中: 20.2千兆字节
数据b-存储在snowflake(内部表)中:7kb(195行)
Spark 配置:r5.4XL(4)按需
雪花仓库:3x大
aws emr版本:emr-5.31.0
spark版本:2.4.6
雪花版:spark-snowflake\ 2.11
测试(使用aws emr)
v1:分别从s3和snowflake读取,进行“join”操作,保存snowflake
结果:2小时后发生oom((退出状态137。)
change spark config by aws guide“.set”(“spark.sql.shuffle.partitions”,“500”)和resultdf.repartition(500).cache():花了2小时46分钟。

object snowflake_v1 {
  val spark_config = new SparkConf()
    .set("fs.s3.maxConnections", "5000")
    .set("spark.sql.broadcastTimeout", "1200")
    .set("spark.sql.shuffle.partitions", "500")
    .set("spark.network.timeout", "10000000")
    .set("spark.executor.heartbeatInterval", "1000000")

  val spark = SparkSession
    .builder()
    .master("yarn")
    .config(spark_config)
    .appName("snowflake")
    .getOrCreate()

  def main(args: Array[String]): Unit = {

    //parquet at snowflake_v1.scala:27
    val Adf =
      spark.read
        .parquet(
          "s3://"
        )
        .cache()

    var sfOptions = Map.apply(
      "sfURL" -> "XXX",
      "sfUser" -> "XXX",
      "sfPassword" -> "XXX",
      "sfDatabase" -> "XXX",
      "sfSchema" -> "XXX",
      "sfWarehouse" -> "XXX"
    )

    val Bdf: DataFrame = spark.sqlContext.read
      .format(SNOWFLAKE_SOURCE_NAME)
      .options(sfOptions)
      .option("dbtable", "t_B")
      .load()

    val resDF =
      Adf.join(Bdf, Seq("cnty"), "leftouter").cache()

    val newDF = resultDF.repartition(500).cache() //If not, OOM occurs

    newDF .write
      .format(SNOWFLAKE_SOURCE_NAME)
      .options(sfOptions)
      .option("dbtable", "t_result_spark_v1")
      .option("parallelism", "8")
      .mode(SaveMode.Overwrite)
      .save()
  }
}


v2:将从雪花中读取的数据b写入s3,重新读取,并在“join”操作后保存雪花
结果:用了5分钟。

object snowflake_v2 {
  val spark_config = "same as v1"

  val spark = "same as v1"

  def main(args: Array[String]): Unit = {

    // parquet at snowflake_v2.scala:25
    val Adf = "same as v1"

    var sfOptions = "same as v1"

    val Bdf: DataFrame = "same as v1"

    //parquet at snowflake_v2.scala:53
    Bdf.write
      .mode(SaveMode.Overwrite)
      .parquet("s3://..b")

    //parquet at snowflake_v2.scala:56
    val Bdf2=
      spark.read.parquet("s3://..b")

    val resDF =
      Adf.join(Bdf2, Seq("cnty"), "leftouter").cache()

    resDF .write
      .format(SNOWFLAKE_SOURCE_NAME)
      .options(sfOptions)
      .option("dbtable", "t_result_spark_v2")
      .option("parallelism", "8")
      .mode(SaveMode.Overwrite)
      .save()
  }
}


v3:分别从s3和snowflake中读取,将结果写入s3并重新读取,保存snowflake
结果:30分钟后,s3和snowflake出现重复行
在将结果写入s3之前,如果我打印计数,则需要1小时11分钟,以便实际操作继续进行。(无重复)

object snowflake_v3 {
  val spark_config = "same as v1"

  val spark = "same as v1"

  def main(args: Array[String]): Unit = {

    //parquet at snowflake_v3.scala:25
    val Adf = "same as v1"

    var sfOptions = "same as v1"

    val Bdf: DataFrame = "same as v1"

    val resDF =
      Adf.join(Bdf, Seq("cnty"), "leftouter")

    println("resDF count")
    //count at snowflake_v3.scala:54
    println(resDF.count) //If not, duplicated rows occur

    //parquet at snowflake_v3.scala:58
    resDF.write
      .mode(SaveMode.Overwrite)
      .parquet("s3://../temp_result")

    //parquet at snowflake_v3.scala:65
    val resDF2 =
      spark.read.parquet("s3://../temp_result")

    resDF2 .write
      .format(SNOWFLAKE_SOURCE_NAME)
      .options(sfOptions)
      .option("dbtable", "t_result_spark_v3")
      .option("parallelism", "8")
      .mode(SaveMode.Overwrite)
      .save()
  }
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题