apachesparksql与sqoop基准测试,同时将数据从rdbms传输到hdfs

k75qkfdt  于 2021-05-29  发布在  Hadoop
关注(0)|答案(6)|浏览(441)

我正在研究一个用例,在这个用例中我必须将数据从rdbms传输到hdfs。我们使用sqoop对这个案例进行了基准测试,发现我们能够在6-7分钟内传输大约20gb的数据。
当我尝试使用sparksql时,性能非常低(1gb的记录需要4分钟才能从netezza传输到hdfs)。我正在尝试进行一些调整并提高其性能,但不太可能将其调整到sqoop的级别(1分钟内大约有3gb的数据)。
我同意spark主要是一个处理引擎这一事实,但我的主要问题是spark和sqoop都在内部使用jdbc驱动程序,所以为什么在性能上有这么大的差异(或者我可能遗漏了一些东西)。我在这里张贴我的代码。

object helloWorld {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Netezza_Connection").setMaster("local")
    val sc= new SparkContext(conf)
    val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
    sqlContext.read.format("jdbc").option("url","jdbc:netezza://hostname:port/dbname").option("dbtable","POC_TEST").option("user","user").option("password","password").option("driver","org.netezza.Driver").option("numPartitions","14").option("lowerBound","0").option("upperBound","13").option("partitionColumn", "id").option("fetchSize","100000").load().registerTempTable("POC")
    val df2 =sqlContext.sql("select * from POC")
    val partitioner= new org.apache.spark.HashPartitioner(14)
    val rdd=df2.rdd.map(x=>(String.valueOf(x.get(1)),x)).partitionBy(partitioner).values
    rdd.saveAsTextFile("hdfs://Hostname/test")
  }
}

我查了很多其他的帖子,但是没有得到一个关于sqoop内部工作和调优的明确答案,也没有得到sqoop和sparksql的基准测试。请帮助理解这个问题。

vi4fp9gy

vi4fp9gy1#

sqoop和sparksql都使用jdbc连接从rdbms引擎获取数据,但是sqoop在这里有一个优势,因为它专门用于在rdbms和hdfs之间迁移数据。
sqoop中可用的每个选项都经过了微调,以在执行数据摄取时获得最佳性能。
您可以从讨论控制Map器数量的选项-m开始。
这就是从rdbms并行获取数据所需要做的。我能用sparksql做吗?当然可以,但是开发人员需要处理sqoop自动处理的“多线程”。

w51jfk4q

w51jfk4q2#

你用错工具了。
sqoop将启动一系列进程(在datanodes上),每个进程都将连接到您的数据库(请参阅num mapper),并且每个进程都将提取数据集的一部分。我不认为你能用spark实现读并行。
使用sqoop获取数据集,然后使用spark进行处理。

wmtdaxz3

wmtdaxz33#

下面的解决方案帮助了我

var df=spark.read.format("jdbc").option("url","
"url").option("user","user").option("password","password").option("dbTable","dbTable").option("fetchSize","10000").load()
df.registerTempTable("tempTable")
var dfRepart=spark.sql("select * from tempTable distribute by primary_key") //this will repartition the data evenly

dfRepart.write.format("parquet").save("hdfs_location")
yfjy0ee7

yfjy0ee74#

我也遇到了同样的问题,因为您正在使用的代码不适用于分区。

sqlContext.read.format("jdbc").option("url","jdbc:netezza://hostname:port/dbname").option("dbtable","POC_TEST").option("user","user").option("password","password").option("driver","org.netezza.Driver").option("numPartitions","14").option("lowerBound","0").option("upperBound","13").option("partitionColumn", "id").option("fetchSize","100000").load().registerTempTable("POC")

您可以通过以下方式检查spark作业中创建的分区数

df.rdd.partitions.length

可以使用以下代码连接数据库:

sqlContext.read.jdbc(url=db_url,
    table=tableName,
    columnName="ID",
    lowerBound=1L,
    upperBound=100000L,
    numPartitions=numPartitions,
    connectionProperties=connectionProperties)

为了优化你的spark工作,以下是参数:1.#分区2--num executors 3。--executor核心4--执行器存储器5--驾驶员记忆6。获取大小
2、3、4和5选项取决于集群配置,您可以在spark ui上监视spark作业。

pgpifvop

pgpifvop5#

@阿弥陀佛虽然是一个答案,但我不同意。
一旦您在从jdbc读取数据时给出了对数据进行分区的 predicate ,spark将为每个分区运行单独的任务。在您的情况下,任务数应该是14(您可以使用spark ui确认这一点)。
我注意到您使用local作为master,这将只为执行者提供1个核心。因此不会有平行性。你的案子就是这样。
现在要获得与sqoop相同的吞吐量,您需要确保这些任务是并行运行的。理论上,这可以通过以下两种方法来实现:1。使用14个执行器,每个执行器有1个核心2个。使用1个14核的执行器(频谱的另一端)
通常,每个执行器有4-5个内核。所以我用15/5=3个执行器来测试性能(我添加了1到14个执行器来考虑在clustor模式下运行的驱动程序的一个内核)。使用:executor.cores,executor.instances in sparkconf.set来播放配置。
如果这不能显著提高性能,那么接下来的事情就是查看执行器内存。
最后,我将调整应用程序逻辑以查看maprdd大小、分区大小和无序排列大小。

owfi6suc

owfi6suc6#

你可以试试这个following:-
从netezza中读取数据,不需要任何分区,并且大小增加到一百万。

sqlContext.read.format("jdbc").option("url","jdbc:netezza://hostname:port/dbname").option("dbtable","POC_TEST").option("user","user").option("password","password").option("driver","org.netezza.Driver").option("fetchSize","1000000").load().registerTempTable("POC")

在将数据写入最终文件之前重新划分数据。

val df3 = df2.repartition(10) //to reduce the shuffle

orc格式比文本更优化。将最终输出写入parquet/orc。

df3.write.format("ORC").save("hdfs://Hostname/test")

相关问题