在spark中使用pushdown查询,如何在spark hbase(bigsql作为sql引擎)中获得并行性?

dtcbnfnu  于 2021-06-09  发布在  Hbase
关注(0)|答案(1)|浏览(703)

在spark中,数据库的sql引擎对pushdownquery进行处理,并根据处理结果构造Dataframe。因此,spark将查询该查询的结果。

val pushdownQuery = """(SELECT DISTINCT(FLIGHT_NUMBER) blah blah blah )"""

val dbDataFrame = spark.read.format("jdbc")
.option("url", url).option("dbtable", pushdownQuery).option("driver", driver)
.option("numPartitions", 4)
.option("partitionColumn", "COUNTRY_CODE")
.load()

我可以从另一个参考资料中看出(https://dzone.com/articles/how-apache-spark-makes-your-slow-mysql-queries-10x)在spark-mysql中,pushdown查询的并行性是通过基于参数numpartitions和partitioncolumn触发多个查询来实现的。这与sqoop如何分布的方法非常相似。比如说上面给出的例子中的参数的numpartitions=4;partitioncolumn=country\u code,在我们的表中country\u code的值范围为(000999)。
生成4个查询;激发到db,并根据这些结果构造dataframe(本例中并行度为4)。

Q1 : SELECT DISTINCT(FLIGHT_NUMBER) blah blah blah WHERE COUNTRY_CODE >= 000 AND COUNTRY_CODE <= 250
Q2 : SELECT DISTINCT(FLIGHT_NUMBER) blah blah blah WHERE COUNTRY_CODE > 250 AND COUNTRY_CODE  <= 500
Q3 : SELECT DISTINCT(FLIGHT_NUMBER) blah blah blah WHERE COUNTRY_CODE > 500 AND COUNTRY_CODE  <= 750
Q4 : SELECT DISTINCT(FLIGHT_NUMBER) blah blah blah WHERE COUNTRY_CODE > 750 AND COUNTRY_CODE  <= 999

我现在的问题是,如何在spark(版本2.1)+hbase(查询引擎-bigsql)中使用这种方法实现并行性?它现在没有给我平行性。连接spark hbase的驱动程序需要更新吗?或者spark需要这么做?或者什么样的改变能帮助它得到这些?一些方向对我有帮助。谢谢您!

mklgxw1f

mklgxw1f1#

为了获得最佳性能,我建议使用--num executors 4和--executor cores 1开始spark作业,因为jdbc连接是单线程的,每个查询一个任务在一个核心上运行。通过更改此配置,当作业正在运行时,您可以观察并行运行的任务,这是每个执行器中正在使用的核心。
请改用以下函数:

val connectionProperties: Properties = new Properties
connectionProperties.put("user", "xxxx")
connectionProperties.put("password", "xxxx")
connectionProperties.put("fetchsize", "10000") //fetches 10000 records at once per task
connectionProperties.put("driver", "com.mysql.jdbc.Driver")
connectionProperties

val pushdownQuery = """(SELECT DISTINCT(FLIGHT_NUMBER) blah blah blah ) tbl_alias"""

val dbDataFrame = spark.read.jdbc(url, pushdownQuery, "COUNTRY_CODE", 0L, 4L, 4, connectionProperties)

参考https://spark.apache.org/docs/2.3.0/api/scala/index.html#org.apache.spark.sql.dataframereader@jdbc公司(url:string,table:string,c级olumnname:string,左owerbound:long,单位pperbound:long,努普artitions:int,连接Properties:java.util.properties):org.apache.spark.sql.dataframe

相关问题