如何使用spark dataframereader api为表指定筛选条件?

esbemjvw  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(326)

我在databricks文档上读到关于spark的信息https://docs.databricks.com/data/tables.html#partition-修剪-1
上面写着
扫描表时,spark向下推涉及partitionby键的筛选器 predicate 。在这种情况下,spark避免读取不满足这些 predicate 的数据。例如,假设有一个表被 <date> . 查询,例如 SELECT max(id) FROM <example-data> WHERE date = '2010-10-10' 只读取包含日期值与查询中指定的元组匹配的元组的数据文件。
在读取表时,如何在dataframereader api中指定这样的过滤条件?

g0czyy6m

g0czyy6m1#

由于spark在您使用dataframe reader读取数据时是延迟计算的,因此它只是作为一个stage添加到底层dag中。
现在,当您对数据运行sql查询时,它也被添加为dag中的另一个阶段。
当您在Dataframe上应用任何操作时,将评估dag,并通过catalyst optimized对所有阶段进行优化,最终生成最经济高效的物理计划。
在dag求值时, predicate 条件被下推,只有所需的数据被读入内存。

e0bqpujr

e0bqpujr2#

dataframereader是专门使用sparksession.read创建的(可用)。这意味着它是在执行以下代码时创建的(csv文件加载示例)

val df = spark.read.csv("path1,path2,path3")

spark提供了一个可插入的数据提供程序框架(数据源api)来推出您自己的数据源。基本上,它提供了可以实现的接口,用于读取/写入自定义数据源。这就是通常实现分区修剪和 predicate 过滤器下推的地方。
databrickspark支持许多内置的数据源(以及 predicate 下推和分区修剪功能)https://docs.databricks.com/data/data-sources/index.html.
因此,如果需要从jdbc表加载数据并指定过滤条件,请参见下面的示例

// Note: The parentheses are required.
val pushdown_query = "(select * from employees where emp_no < 10008) emp_alias"
val df = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties)
display(df)

请参阅这里的更多细节https://docs.databricks.com/data/data-sources/sql-databases.html

相关问题