我需要运行下面这样一个任务。不知怎么的,我遗漏了一点。我知道,我不能像这样使用javasparkcontext并传递javafunctions,因为有序列化问题。
我需要运行多个cassandra查询,大小为cartesian.size()。有什么建议吗?
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD<DateTime> dateTimeJavaRDD = jsc.parallelize(dateTimes); //List<DateTime>
JavaRDD<Integer> virtualPartitionJavaRDD = jsc.parallelize(virtualPartitions); //List<Integer>
JavaPairRDD<DateTime, Integer> cartesian = dateTimeJavaRDD.cartesian(virtualPartitionJavaRDD);
long c = cartesian.map(new Function<Tuple2<DateTime, Integer>, Long>() {
@Override
public Long call(Tuple2<DateTime, Integer> tuple2) throws Exception {
return javaFunctions(jsc).cassandraTable("keyspace", "table").where("p1 = ? and p2 = ?", tuple2._1(), tuple2._2()).count();
}
}).reduce((a,b) -> a + b);
System.out.println("TOTAL ROW COUNT IS: " + c);
1条答案
按热度按时间xnifntxz1#
正确的解决方案应该是在数据和Casasndra表之间执行连接。joinWithCassandraTable函数可以完成你所需要的工作, -你只需生成
Tuple2
的RDD,其中包含p1
和p2
的值,然后调用joinWithCassandra表,如下所示(未测试,取自我的示例here):