我正在从事一个springjava项目,并使用datastax连接器集成apachespark和cassandra。
我有自动连线sparksession和下面的代码行似乎工作。
Map<String, String> configMap = new HashMap<>();
configMap.put("keyspace", "key1");
configMap.put("table", tableName.toLowerCase());
Dataset<Row> ds = sparkSession.sqlContext().read().format("org.apache.spark.sql.cassandra").options(configMap)
.load();
ds.show();
在上面的步骤中,我正在加载数据集,在下面的步骤中,我正在过滤datetime字段。
String s1 = "2020-06-23 18:51:41";
String s2 = "2020-06-23 18:52:21";
Timestamp from = Timestamp.valueOf(s1);
Timestamp to = Timestamp.valueOf(s2);
ds = ds.filter(df.col("datetime").between(from, to));
有没有可能在加载过程中应用这个过滤条件?如果可以,有人能告诉我怎么做吗?
提前谢谢。
2条答案
按热度按时间8zzbczxx1#
只有在进行筛选的列是第一个群集列时,才会有效地向下推送此筛选器。正如rayan所指出的,我们可以使用
explain
命令来检查是否发生了 predicate 下推-相应的 predicate 应该具有*
他们身边的人物,像这样:如果 predicate 不会被推送,我们将看到在spark级别上进行过滤时,scan之后的另一个步骤。
thigvfpy2#
您不必在这里显式执行任何操作,spark cassandra连接器具有 predicate 下推,因此您的过滤条件将在数据选择期间应用。
资料来源:https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md
连接器将自动将所有有效 predicate 下推到cassandra。数据源还将自动仅从cassandra中选择完成查询所需的列。这可以通过
explain
命令。