执行pyspark.sql.dataframe.take需要一个多小时(4)

j2datikz  于 2021-06-09  发布在  Hbase
关注(0)|答案(1)|浏览(369)

我在3台虚拟机上运行spark 1.6(即1台主控机;2x从机)所有4核和16gb ram。
我可以看到在spark master webui上注册的工人。
我想从我的vertica数据库中检索数据来处理它。由于我无法运行复杂的查询,我尝试了虚拟查询来理解。我们认为这是一项容易的任务。
我的代码是:

df = sqlContext.read.format('jdbc').options(url='xxxx', dbtable='xxx', user='xxxx', password='xxxx').load()
four = df.take(4)

输出为(注:i替换为 @IPSLAVE 从属vm(ip:端口):

16/03/08 13:50:41 INFO SparkContext: Starting job: take at <stdin>:1
16/03/08 13:50:41 INFO DAGScheduler: Got job 0 (take at <stdin>:1) with 1 output partitions
16/03/08 13:50:41 INFO DAGScheduler: Final stage: ResultStage 0 (take at <stdin>:1)
16/03/08 13:50:41 INFO DAGScheduler: Parents of final stage: List()
16/03/08 13:50:41 INFO DAGScheduler: Missing parents: List()
16/03/08 13:50:41 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at take at <stdin>:1), which has no missing parents
16/03/08 13:50:41 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 5.4 KB, free 5.4 KB)
16/03/08 13:50:41 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 2.6 KB, free 7.9 KB)
16/03/08 13:50:41 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on @IPSLAVE (size: 2.6 KB, free: 511.5 MB)
16/03/08 13:50:41 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
16/03/08 13:50:41 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at take at <stdin>:1)
16/03/08 13:50:41 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
16/03/08 13:50:41 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, @IPSLAVE, partition 0,PROCESS_LOCAL, 1922 bytes)
16/03/08 13:50:41 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on @IPSLAVE (size: 2.6 KB, free: 511.5 MB)
16/03/08 15:02:20 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 4299240 ms on @IPSLAVE (1/1)
16/03/08 15:02:20 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
16/03/08 15:02:20 INFO DAGScheduler: ResultStage 0 (take at <stdin>:1) finished in 4299.248 s
16/03/08 15:02:20 INFO DAGScheduler: Job 0 finished: take at <stdin>:1, took 4299.460581 s

正如你所看到的,这需要相当长的时间。我的表实际上相当大(存储了大约2.2亿行,每行11个字段),但是这样的查询将使用“普通”sql(例如pyodbc)立即执行。
我想我误会了/误用了spark,你有什么想法或建议让它更好地工作吗?

wqnecbli

wqnecbli1#

虽然spark支持jdbc上的有限 predicate 下推,但所有其他操作(如limit、group和aggregation)都是在内部执行的。不幸的是这意味着 take(4) 将首先获取数据,然后应用 limit . 换言之,您的数据库将执行(假设没有任何投影和过滤器)相当于:

SELECT * FROM table

剩下的将由spark处理。涉及到一些优化(特别是spark以迭代方式计算分区,以获得所请求的记录数) LIMIT )但与数据库端优化相比,这个过程仍然相当低效。
如果你想推 limit 对于数据库,必须静态地使用子查询作为 dbtable 参数:

(sqlContext.read.format('jdbc')
    .options(url='xxxx', dbtable='(SELECT * FROM xxx LIMIT 4) tmp', ....))
sqlContext.read.format("jdbc").options(Map(
  "url"     -> "xxxx",
  "dbtable" -> "(SELECT * FROM xxx LIMIT 4) tmp",
))

请注意,子查询中的别名是必需的。
注:
一旦数据源api v2准备就绪,将来可能会改进此行为:
Spark-15689
spip:数据源api v2

相关问题