在spark中,数据库的sql引擎对pushdownquery进行处理,并根据处理结果构造Dataframe。因此,spark将查询该查询的结果。
val pushdownQuery = """(SELECT DISTINCT(FLIGHT_NUMBER) blah blah blah )"""
val dbDataFrame = spark.read.format("jdbc")
.option("url", url).option("dbtable", pushdownQuery).option("driver", driver)
.option("numPartitions", 4)
.option("partitionColumn", "COUNTRY_CODE")
.load()
我可以从另一个参考资料中看出(https://dzone.com/articles/how-apache-spark-makes-your-slow-mysql-queries-10x)在spark-mysql中,pushdown查询的并行性是通过基于参数numpartitions和partitioncolumn触发多个查询来实现的。这与sqoop如何分布的方法非常相似。比如说上面给出的例子中的参数的numpartitions=4;partitioncolumn=country\u code,在我们的表中country\u code的值范围为(000999)。
生成4个查询;激发到db,并根据这些结果构造dataframe(本例中并行度为4)。
Q1 : SELECT DISTINCT(FLIGHT_NUMBER) blah blah blah WHERE COUNTRY_CODE >= 000 AND COUNTRY_CODE <= 250
Q2 : SELECT DISTINCT(FLIGHT_NUMBER) blah blah blah WHERE COUNTRY_CODE > 250 AND COUNTRY_CODE <= 500
Q3 : SELECT DISTINCT(FLIGHT_NUMBER) blah blah blah WHERE COUNTRY_CODE > 500 AND COUNTRY_CODE <= 750
Q4 : SELECT DISTINCT(FLIGHT_NUMBER) blah blah blah WHERE COUNTRY_CODE > 750 AND COUNTRY_CODE <= 999
我现在的问题是,如何在spark(版本2.1)+hbase(查询引擎-bigsql)中使用这种方法实现并行性?它现在没有给我平行性。连接spark hbase的驱动程序需要更新吗?或者spark需要这么做?或者什么样的改变能帮助它得到这些?一些方向对我有帮助。谢谢您!
1条答案
按热度按时间mklgxw1f1#
为了获得最佳性能,我建议使用--num executors 4和--executor cores 1开始spark作业,因为jdbc连接是单线程的,每个查询一个任务在一个核心上运行。通过更改此配置,当作业正在运行时,您可以观察并行运行的任务,这是每个执行器中正在使用的核心。
请改用以下函数:
参考https://spark.apache.org/docs/2.3.0/api/scala/index.html#org.apache.spark.sql.dataframereader@jdbc公司(url:string,table:string,c级olumnname:string,左owerbound:long,单位pperbound:long,努普artitions:int,连接Properties:java.util.properties):org.apache.spark.sql.dataframe