CassandraSpark连接器

6ss1mwsb  于 2022-11-05  发布在  Cassandra
关注(0)|答案(1)|浏览(108)

我的cassandra CF有日期和id作为分区键。而查询我只知道日期,所以我循环的id的范围。
我的问题围绕着连接器如何执行下面的代码。
SparkDriver代码如下所示-

SparkConf conf = new SparkConf().setAppName("DemoApp")
.conf.setMaster("local[*]")
.set("spark.cassandra.connection.host", "10.*.*.*")
.set("spark.cassandra.connection.port", "*");

JavaSparkContext sc = new JavaSparkContext(conf);
SparkContextJavaFunctions javaFunctions = CassandraJavaUtil.javaFunctions(sc);

String date = "23012017";

for(String id : idlist) {

JavaRDD<CassandraRow> cassandraRowsRDD = 

javaFunctions.cassandraTable("datakeyspace", "sample2")
            .where("date = ?",date)
            .where("id = ? ", id)
            .select("data");

 cassandraRowsRDDList.add(cassandraRowsRDD);
}

List<CassandraRow> collectAllRows = new ArrayList<CassandraRow>();
        for(JavaRDD<CassandraRow> rdd : cassandraRowsRDDList){
            //do transformations

            collectAllRows.addAll(rdd.collect());
    }

1)首先,我想问一下,如果我遍历idlist,假设idlist有1000个元素,而且可能会不断增加,这是否有效?每个选择查询如何在集群中分布?特别是如何维护Cassandra DB连接?
2)在我的驱动程序中,在循环之后,我把所有的行放在List中,然后对每一行进行转换,并过滤掉重复的行。这也是由spark在集群上分发的吗?还是在驱动程序端发生的?
请帮忙!

5hcedyr0

5hcedyr01#

spark cassandra connector提供了一种更好的方法来实现这一点。您可以创建一个(date,id)的rdd,然后在date和id列上调用joinWithCassandraTable函数。Connector很聪明地做到了这一点,所有数据都将只由worker获取,而且也没有shuffle,即每个worker将只获取它所拥有的日期和id的数据。

相关问题