apachespark+cassandra+java+spark会话根据给定的from和to值之间的datetime过滤记录

dsekswqp  于 2021-05-29  发布在  Spark
关注(0)|答案(2)|浏览(308)

我正在从事一个springjava项目,并使用datastax连接器集成apachespark和cassandra。
我有自动连线sparksession和下面的代码行似乎工作。

Map<String, String> configMap = new HashMap<>();
configMap.put("keyspace", "key1");
configMap.put("table", tableName.toLowerCase());

Dataset<Row> ds = sparkSession.sqlContext().read().format("org.apache.spark.sql.cassandra").options(configMap)
        .load();
ds.show();

在上面的步骤中,我正在加载数据集,在下面的步骤中,我正在过滤datetime字段。

String s1 = "2020-06-23 18:51:41";
String s2 = "2020-06-23 18:52:21";

Timestamp from = Timestamp.valueOf(s1);
Timestamp to = Timestamp.valueOf(s2);
ds = ds.filter(df.col("datetime").between(from, to));

有没有可能在加载过程中应用这个过滤条件?如果可以,有人能告诉我怎么做吗?
提前谢谢。

8zzbczxx

8zzbczxx1#

只有在进行筛选的列是第一个群集列时,才会有效地向下推送此筛选器。正如rayan所指出的,我们可以使用 explain 命令来检查是否发生了 predicate 下推-相应的 predicate 应该具有 * 他们身边的人物,像这样:

val dcf3 = dc.filter("event_time >= cast('2019-03-10T14:41:34.373+0000' as timestamp) 
   AND event_time <= cast('2019-03-10T19:01:56.316+0000' as timestamp)")

// dcf3.explain
// == Physical Plan ==
// *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [uuid#21,event_time#22,id#23L,value#24] 
// PushedFilters: [ *GreaterThanOrEqual(event_time,2019-03-10 14:41:34.373), *LessThanOrE..., 
// ReadSchema: struct<uuid:string,event_time:timestamp,id:bigint,value...

如果 predicate 不会被推送,我们将看到在spark级别上进行过滤时,scan之后的另一个步骤。

thigvfpy

thigvfpy2#

您不必在这里显式执行任何操作,spark cassandra连接器具有 predicate 下推,因此您的过滤条件将在数据选择期间应用。
资料来源:https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md
连接器将自动将所有有效 predicate 下推到cassandra。数据源还将自动仅从cassandra中选择完成查询所需的列。这可以通过 explain 命令。

相关问题