我想在我的集群上使用apachespark,它由5个糟糕的系统组成。首先,我已经在我的节点上实现了cassandra3.11.3,所有节点都正常。
在那之后,我用javaapi在我的节点中插入了100k条记录,而不使用spark,一切正常。
现在我要执行一个简单的查询,如下所示:
select * from myKeySpace.myTbl where field1='someValue';
由于我的节点在硬件上很弱,所以我只想从mytbl中获取一点记录,如下所示:
select * from myKeySpace.myTbl where field1='someValue' limit 20;
我已经测试了这个(a),但是它非常慢(我不知道原因):
Dataset<Row> df1 = sparkSession.sql("select * from myKeySpace.myTbl where field1='someValue' limit 20");
还有(b)我认为spark获取所有数据,然后使用极限函数,这不是我的目标:
Dataset<Row> df1 = sparkSession.sql("select * from myKeySpace.myTbl where field1='someValue'").limit(20);
我想我也可以用Spark芯。我还知道一个叫做 perPartitionLimit
在cassandra 3.6及更高版本(d)中实现。
如您所知,由于我的节点很弱,我不想从cassandra表中获取所有记录,然后使用limit函数之类的函数。我只想从表中获取少量记录,这样我的节点就可以处理这些记录。
那么最好的解决方案是什么呢?
更新:
我已经做了@aksw在评论中给出的建议:
SparkConf conf = new SparkConf()
.setAppName("SparkTest")
.set("spark.cassandra.connection.host","192.168.107.100");
long limit=20;
JavaSparkContext jsc = new JavaSparkContext(conf);
CassandraJavaRDD<CassandraRow> rdd1 = javaFunctions(jsc)
.cassandraTable("myKeySpace", "myTbl")
.select("id").perPartitionLimit(limit);
System.out.println("Count: " + rdd1.count()); //output is "Count: 100000" which is wrong!
jsc.stop();
但是 perPartitionLimit(limit)
那个 limit=20
不工作,所有记录都会被提取!
暂无答案!
目前还没有任何答案,快来回答吧!