我按照这里的说明连接我的spark程序从cassandra读取数据。以下是我如何配置spark:
val configBuilder = SparkSession.builder
.config("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions")
.config("spark.cassandra.connection.host", cassandraUrl)
.config("spark.cassandra.connection.port", 9042)
.config("spark.sql.catalog.myCatalogName", "com.datastax.spark.connector.datasource.CassandraCatalog")
根据文档,完成后,我应该可以这样查询cassandra: spark.sql("select * from myCatalogName.myKeyspace.myTable where myPartitionKey = something")
但是,当我这样做时,会收到以下错误消息:
mismatched input '.' expecting <EOF>(line 1, pos 43)
== SQL ==
select * from myCatalog.myKeyspace.myTable where myPartitionKey = something
----------------------------------^^^
当我尝试以下格式时,我成功地从cassandra检索条目:
val frame = spark
.read
.format("org.apache.spark.sql.cassandra")
.options(Map("keyspace" -> "myKeyspace", "table" -> "myTable"))
.load()
.filter(col("timestamp") > startDate && col("timestamp") < endDate)
但是,此查询需要执行完整的表扫描。这个表包含几百万个条目,我更愿意利用 predicate 下推功能,它似乎只能通过sqlapi使用。
我使用的是spark-core 2.11:2.4.3、spark-cassandra-connector 2.11:2.5.0和cassandra 3.11.6
谢谢!
1条答案
按热度按时间lbsnaicq1#
catalogs api仅在尚未发布的scc版本3.0中可用。它将与spark 3.0版本一起发布,因此在SCC2.5.0中不可用。因此,对于2.5.0,您需要显式注册表,使用
create or replace temporary view...
,如文件所述:关于下推(它们对所有dataframeapi、sql、scala、python等的工作方式都是相同的)-当您的
timestamp
是第一个聚类列。即使在这种情况下,典型的问题是startDate
以及endDate
作为字符串,而不是时间戳。您可以通过执行frame.explain
,并检查 predicate 是否被下推-它应该*
predicate 名称附近的标记。例如,
第一个
filter
表达式将向下推 predicate ,而(not_filtered
)需要完全扫描。